LangGraph Accounts Payable Workflow with HITL Approval

Build a LangGraph AP workflow end-to-end: StateGraph, conditional edges, interrupt() approval gate, Postgres checkpointer, idempotency rules, and ERP post.

Published
Updated
Reading Time
28 min
Topics:
API & Developer IntegrationLangGraphAP automationhuman-in-the-loopPythonAI agents

A LangGraph accounts payable workflow models AP as what it actually is: a state graph with branches. Typed nodes for extraction, validation, three-way match, conditional routing, human approval, and ERP posting share state through a TypedDict and are wired together with add_conditional_edges. Human approval gates use interrupt() to pause the graph and Command(resume=...) to continue once an approver responds, while a PostgresSaver checkpointer persists state across the hours or days an approval can take. Because LangGraph restarts the entire node on resume, any side effects — Slack messages, audit writes, ERP posts — sit after the interrupt, not before.

That mechanical shape matches the AP workflow itself. An invoice arrives. Something extracts the data. Something validates the totals and dates. Something matches against the purchase order and the receipt. Something decides whether the invoice clears automatically, needs a human approver, or has to escalate. A human, when involved, may not respond for two days. Something posts to the ERP. Something writes an audit trail. A single LLM call cannot hold that — too many branches, too much intermediate state, too long a pause in the middle. A state graph can.

This guide builds that graph end-to-end against a vendor invoice PDF, using langgraph==1.x (current 1.2.0 as of May 2026). By the end, you have a runnable graph that ingests an invoice URL, calls the extraction service inside a node, validates totals and tax, runs a three-way match against the PO and receipt, conditionally routes to auto-approve, human-approve, or escalate, pauses at the approval gate, persists state to Postgres, resumes when the approver replies days later, and posts idempotently to the ERP. Seven nodes carry the work: extract_node, validate_node, match_node, routing_decision, human_approve_node, post_to_erp_node, and escalate_node. The rest of the article builds and wires them.

The framing throughout is graph-first. Where it helps, this article references the broader invoice processing pipeline architecture that an AP graph sits inside, but the focus here is the LangGraph primitives and their AP wiring, not the surrounding pipeline.

The AP state schema as a TypedDict

Every node in a LangGraph state graph reads from and writes to a single shared state object. The shape of that object is the workflow's data contract — get it right early and the nodes write themselves; get it wrong and every node fights the schema. For AP, the realistic shape carries the invoice payload, the validation result, the two match results, the approver record, the ERP result, and an audit trail that accumulates across every node.

from typing import TypedDict, Literal, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END


class APState(TypedDict):
    invoice_url: str
    invoice_data: dict | None
    validation_errors: list[str]
    po_match: dict | None
    receipt_match: dict | None
    approver_id: str | None
    approval_decision: Literal["approved", "rejected"] | None
    escalation_reason: str | None
    erp_post_id: str | None
    audit_trail: Annotated[list[dict], add]

Each field has a node that owns its write. extract_node populates invoice_data from the PDF at invoice_url. validate_node populates validation_errors — empty list when the invoice is clean. match_node populates po_match and receipt_match, each a dict that carries a clean: bool and a differences: list[dict]. The routing function reads those three results and dispatches the next node — no state write required. human_approve_node writes approver_id and approval_decision when a human gets involved. post_to_erp_node writes erp_post_id once the invoice clears. escalate_node writes escalation_reason when the graph fails the invoice out. Every node appends to audit_trail.

The Optional-via-None shape on most fields is deliberate. Nodes that haven't run yet leave their outputs at None, which makes the routing function's checks straightforward and obvious — if state["validation_errors"]: reads as "are there errors yet?", if state["po_match"] reads as "has match run?". No sentinel values, no exception handling for unset fields.

The one field that needs more thought is audit_trail. Every node appends to it. If the graph stays purely sequential, the simple pattern works — each node reads state["audit_trail"], builds a new list with the appended entry, and returns it. But if you ever parallelize — running validate_node and match_node concurrently, for instance, since both only read invoice_data — two nodes returning a fresh list each overwrite each other's writes. The fix is the reducer: Annotated[list[dict], add] tells LangGraph to merge concurrent writes by appending rather than replacing. Adding the reducer up front, even on a sequential graph, costs nothing and removes a silent failure mode if you later parallelize.

The imports above are the full set the schema needs: TypedDict and Literal for the type definitions, Annotated and operator.add for the reducer, and the graph primitives (StateGraph, START, END) that the next sections will use.

The seven nodes that do the AP work

A LangGraph node is a plain Python function that takes the state dict and returns a partial state dict; LangGraph merges the returned keys back into state for the next node to read. Because audit_trail carries the add reducer from the prior section, every node returns its single new entry as a one-element list and LangGraph appends it for you. Each node below stays focused on its own concern; routing logic stays out of node bodies and into the graph topology, where the next section wires it.

extract_node

The extract node calls the extraction service against the invoice PDF and writes the structured result into invoice_data. The cleanest production shape is the official Python SDK, which wraps upload, submit, poll, and download into a single blocking call.

import os
import json
import tempfile
from urllib.parse import urlparse
import httpx
from invoicedataextraction import InvoiceDataExtraction

client = InvoiceDataExtraction(
    api_key=os.environ["INVOICE_DATA_EXTRACTION_API_KEY"],
)

EXTRACTION_PROMPT = (
    "Extract invoice_number, invoice_date, vendor_name, vendor_id, "
    "po_number, net_amount, tax, grand_total. One row per invoice."
)


def extract_node(state: APState) -> dict:
    """Download the PDF, run extraction, return the structured invoice_data."""
    with tempfile.TemporaryDirectory() as tmp_dir:
        local_path = os.path.join(tmp_dir, os.path.basename(urlparse(state["invoice_url"]).path))
        with httpx.stream("GET", state["invoice_url"]) as response:
            response.raise_for_status()
            with open(local_path, "wb") as f:
                for chunk in response.iter_bytes():
                    f.write(chunk)

        result = client.extract(
            files=[local_path],
            prompt=EXTRACTION_PROMPT,
            output_structure="per_invoice",
            download={"formats": ["json"], "output_path": tmp_dir},
        )

        json_path = os.path.join(tmp_dir, "extraction.json")
        with open(json_path) as f:
            extracted = json.load(f)

    invoice_data = extracted["rows"][0]
    return {
        "invoice_data": invoice_data,
        "audit_trail": [{"step": "extract", "extraction_id": result["extraction_id"]}],
    }

Two SDK details matter here. First, the SDK takes local file paths rather than URLs or in-memory bytes, so a node receiving an HTTP URL writes the file to a temp path before calling extract. Second, client.extract(...) is synchronous — it blocks until the extraction reaches a terminal state and returns once the JSON output is on disk. The node returns immediately after; no separate poll node is needed. The extraction service the node calls is the same production-ready invoice data extraction API the broader product runs on; teams integrating it for the first time from outside a LangGraph workflow should start with the invoice extraction API quickstart.

validate_node

The validate node runs arithmetic, date, tax, and required-field checks against invoice_data and writes any failures into validation_errors. Empty list means clean; non-empty means the graph will route to escalate.

from datetime import date, datetime
from decimal import Decimal


TOLERANCE = Decimal("0.02")


def run_validation_rules(invoice: dict) -> list[str]:
    errors: list[str] = []

    for required in ("invoice_number", "invoice_date", "vendor_id", "grand_total"):
        if not invoice.get(required):
            errors.append(f"missing required field: {required}")

    try:
        inv_date = datetime.strptime(invoice["invoice_date"], "%Y-%m-%d").date()
        if inv_date > date.today():
            errors.append("invoice_date is in the future")
    except (KeyError, ValueError):
        errors.append("invoice_date is not in YYYY-MM-DD format")

    net = Decimal(str(invoice.get("net_amount", "0")))
    tax = Decimal(str(invoice.get("tax", "0")))
    total = Decimal(str(invoice.get("grand_total", "0")))
    if abs((net + tax) - total) > TOLERANCE:
        errors.append(f"net + tax != grand_total: {net} + {tax} != {total}")

    if invoice.get("vendor_id") and not vendor_master.exists(invoice["vendor_id"]):
        errors.append(f"vendor {invoice['vendor_id']} not in vendor master")

    return errors


def validate_node(state: APState) -> dict:
    errors = run_validation_rules(state["invoice_data"])
    return {
        "validation_errors": errors,
        "audit_trail": [{"step": "validate", "error_count": len(errors)}],
    }

The rule set above is the minimum useful surface — required fields, date sanity, arithmetic, vendor master lookup. Real AP teams extend it with currency consistency, PO format checks, duplicate-invoice detection by (vendor_id, invoice_number), tax-rate sanity against the vendor's expected jurisdiction, and tolerance bands on line-item rollups. The validating extracted invoice data in API workflows article covers the validation patterns more deeply; the goal in validate_node is to surface failures cleanly so the routing function can branch on them.

match_node

The match node looks up the purchase order by po_number from invoice_data, looks up the receipt against the PO, and runs a three-way diff comparing the invoice line items against PO commitments and receipt quantities. Each match result carries a clean: bool and a differences: list[dict] so downstream logic can branch on po_match["clean"] and receipt_match["clean"] without re-running the comparison.

def match_node(state: APState) -> dict:
    invoice = state["invoice_data"]
    po = po_repository.find_by_number(invoice["po_number"])

    if not po:
        return {
            "po_match": {"clean": False, "differences": [{"reason": "po_not_found"}]},
            "receipt_match": None,
            "audit_trail": [{"step": "match", "po_found": False}],
        }

    receipt = receipt_repository.find_by_po(po["id"])
    po_diff = compare_invoice_to_po(invoice, po)
    receipt_diff = compare_invoice_to_receipt(invoice, receipt) if receipt else None

    return {
        "po_match": {
            "clean": not po_diff,
            "differences": po_diff,
            "po_id": po["id"],
        },
        "receipt_match": {
            "clean": receipt is not None and not receipt_diff,
            "differences": receipt_diff or [],
            "receipt_id": receipt["id"] if receipt else None,
        },
        "audit_trail": [{"step": "match", "po_id": po["id"], "clean": not po_diff}],
    }

Three-way matching as an AP process — what fields to compare, what tolerance to apply, how to handle partial receipts — is its own discipline that this article doesn't try to redo. The node above is the LangGraph wiring; the matching logic itself lives in compare_invoice_to_po and compare_invoice_to_receipt and follows your team's existing match rules.

routing_decision

The routing function is the conditional-edges driver. It returns a string naming the next node based on the state at the end of match_node. The order of checks below matters: validation failures escalate first, then PO match failures, then the auto-approve threshold, then the human-approve fall-through.

AUTO_APPROVE_THRESHOLD = Decimal("5000")


def routing_decision(state: APState) -> str:
    if state["validation_errors"]:
        return "escalate"

    po_match = state.get("po_match")
    if po_match is None or not po_match["clean"]:
        return "escalate"

    grand_total = Decimal(str(state["invoice_data"]["grand_total"]))
    if grand_total < AUTO_APPROVE_THRESHOLD:
        return "auto_approve"

    return "human_approve"

The threshold is a plain module constant here for readability. In production, it's table-driven — different vendors, expense categories, or cost centers get different thresholds, and an admin UI updates them without redeploying the graph. The routing function reads the threshold from wherever your config lives; the return-string contract to LangGraph stays the same.

human_approve_node

The human-approve node is the one that pauses the graph and waits for a real person. Its body is short, but its semantics — interrupt() and Command(resume=...) — are the most important mechanics in the article. The interrupt-and-resume section below builds it in full; for now it has the same partial-state-dict shape as the others.

post_to_erp_node

The post-to-ERP node calls the ERP API to record the approved invoice. The critical detail is the idempotency key: derived from (vendor_id, invoice_number), passed to the ERP API in the Idempotency-Key header, so a retry of the same logical post is a no-op upstream rather than a duplicate bill. Why a retry can happen at all — and why writing this key matters more than it looks — is what the idempotency section below covers.

def post_to_erp_node(state: APState) -> dict:
    invoice = state["invoice_data"]
    idempotency_key = f"{invoice['vendor_id']}:{invoice['invoice_number']}"
    result = erp_client.post_invoice(
        invoice=invoice,
        approver_id=state.get("approver_id"),
        idempotency_key=idempotency_key,
    )
    return {
        "erp_post_id": result["id"],
        "audit_trail": [
            {"step": "post_to_erp", "erp_post_id": result["id"], "idempotency_key": idempotency_key}
        ],
    }

escalate_node

The escalate node is the terminal failure path. It assembles a human-readable reason from validation_errors and any po_match or receipt_match differences, notifies the AP team through whatever channel they use (Slack webhook, ticket, email), and returns.

def escalate_node(state: APState) -> dict:
    reasons: list[str] = list(state["validation_errors"])
    po_match = state.get("po_match")
    if po_match and not po_match["clean"]:
        reasons.extend(d.get("reason", str(d)) for d in po_match.get("differences", []))

    reason = "; ".join(reasons) or "unknown"
    ap_team_notifier.send(
        invoice=state["invoice_data"],
        reason=reason,
    )
    return {
        "escalation_reason": reason,
        "audit_trail": [{"step": "escalate", "reason": reason}],
    }

Wiring nodes together with conditional edges

With seven node functions defined, the graph itself is small. A StateGraph registers each node by name, the linear edges run extract → validate → match, then add_conditional_edges dispatches from match to one of three branches based on the routing function's return string.

from langgraph.graph import StateGraph, START, END

builder = StateGraph(APState)

builder.add_node("extract", extract_node)
builder.add_node("validate", validate_node)
builder.add_node("match", match_node)
builder.add_node("human_approve", human_approve_node)
builder.add_node("post_to_erp", post_to_erp_node)
builder.add_node("escalate", escalate_node)

builder.add_edge(START, "extract")
builder.add_edge("extract", "validate")
builder.add_edge("validate", "match")

builder.add_conditional_edges(
    "match",
    routing_decision,
    {
        "auto_approve": "post_to_erp",
        "human_approve": "human_approve",
        "escalate": "escalate",
    },
)

builder.add_edge("human_approve", "post_to_erp")
builder.add_edge("post_to_erp", END)
builder.add_edge("escalate", END)

graph = builder.compile()

add_conditional_edges is the entire AP control flow expressed as data. The routing function returns one of three strings; the dict literal maps each string to the next node. The auto-approve and human-approve branches converge at post_to_erp — the same posting code runs whether the approval was automatic or human, which keeps the ERP integration in one place. Escalation flows to END because escalated invoices don't post until a human resolves the underlying issue, at which point a new graph run starts against the corrected invoice.

The trade-off this pattern makes is keeping routing logic at the graph layer rather than inside node bodies. The alternative is routing inside a node using Command(goto=...) — the node returns a Command instance instead of a partial state dict, and the goto field names the next node. That pattern is useful when the routing target depends on data the node computed alongside its state update, but for AP control flow the function-and-dict version is more readable: the graph topology is visible at a glance, the routing rules are testable in isolation as a plain Python function, and the next person reading the code doesn't have to grep through node bodies to find where the decisions live.

When the AP team needs to process a thousand invoices in parallel rather than one at a time, the graph itself does not change. The orchestration layer above the graph invokes graph.invoke(...) concurrently across worker threads with one thread_id per invoice, and LangGraph's checkpointer (set up in the durability section below) isolates state per thread. The deployment shape for that scale — workers, queues, retry, dead-letter handling — sits at the layer above the graph; see batch invoice processing API architecture for the surrounding pattern.

The interrupt() approval gate and Command(resume=...) cycle

The human-approve node is where the graph stops being a function and starts being a workflow. interrupt() pauses graph execution, surfaces a payload to the approval UI, and waits — possibly for days — until the approval handler calls graph.invoke(...) again with a Command(resume=...) against the same thread_id.

from langgraph.types import interrupt, Command


def human_approve_node(state: APState) -> dict:
    decision = interrupt({
        "type": "ap_approval_request",
        "invoice_number": state["invoice_data"]["invoice_number"],
        "vendor": state["invoice_data"]["vendor_name"],
        "grand_total": state["invoice_data"]["grand_total"],
        "po_match": state["po_match"],
        "question": "Approve, reject, or request more info?",
    })
    return {
        "approval_decision": decision["choice"],
        "approver_id": decision["approver_id"],
        "audit_trail": [
            {"step": "human_approve", "decision": decision}
        ],
    }

At runtime, the interrupt(...) call does three things. It serializes the payload dict and attaches it to the graph's interrupt list. It checkpoints the in-flight state through the checkpointer. Then it raises a special internal exception that LangGraph catches and uses to return control to whoever called graph.invoke(...). The node body after interrupt(...) does not execute on this initial run — it will only execute after a resume.

The initial run looks like this. The caller writes the starting state, picks a thread_id deterministic enough that the resume handler can reconstruct it (typically the invoice's primary key in your system), and invokes the graph.

config = {"configurable": {"thread_id": f"ap-invoice-{invoice_id}"}}

result = graph.invoke(
    {
        "invoice_url": pdf_url,
        "validation_errors": [],
        "audit_trail": [],
    },
    config=config,
)

interrupt_payload = result["__interrupt__"][0].value
# {
#   "type": "ap_approval_request",
#   "invoice_number": "INV-2026-00481",
#   "vendor": "Acme Components Ltd",
#   "grand_total": 12480.50,
#   "po_match": {"clean": True, "differences": [], "po_id": "PO-9921"},
#   "question": "Approve, reject, or request more info?",
# }

approval_ui.post_approval_request(thread_id=config["configurable"]["thread_id"], payload=interrupt_payload)

The approval UI — whatever it is — receives the payload, records the thread_id, and shows the invoice to a human. Some teams use Slack interactive messages with approve and reject buttons; some use a web form behind an internal app; some parse approver email replies. The article's scope ends at the LangGraph boundary; the UI choice is the team's.

Hours or days later, the approval handler receives the human's response and resumes the graph. The handler reads the same thread_id, constructs the config, and calls graph.invoke(...) again — but the input this time is a Command(resume=...) carrying the approver's response.

def handle_approval_callback(thread_id: str, choice: str, approver_id: str) -> None:
    config = {"configurable": {"thread_id": thread_id}}
    graph.invoke(
        Command(resume={"choice": choice, "approver_id": approver_id}),
        config=config,
    )
    # graph resumes at human_approve_node and continues to post_to_erp (or END if rejected)

When the resume call lands, LangGraph reads the checkpointed state from the checkpointer using the thread_id, identifies the node that was paused at interrupt(...), and re-enters that node with the resume payload bound to the return value of interrupt(...). The rest of the node body executes; the partial state dict it returns is merged into the checkpointed state; the graph continues along the topology — through post_to_erp on approval, or directly to END if your routing handles a "rejected" choice differently. The thread_id is the contract: same thread_id on both calls, and LangGraph finds the right state. A new thread_id would start a fresh run from START.

The variant to know about is Command(goto=...). If the approver needs more information rather than approving or rejecting outright — say the PO match looks wrong and they want the matching to re-run against an updated PO — the resume call can redirect to a different node:

Command(resume={"choice": "needs_info"}, goto="match")

That sends the graph back to match_node rather than continuing to post_to_erp. The state update from the resume still applies; only the next-node dispatch changes. Useful for approver redirects, dispute workflows, or any case where the human approval surface needs more outcomes than a simple approve/reject.

Two practical notes round out the cycle. The dict passed to interrupt(...) must be JSON-serializable because the checkpointer persists it; complex objects need to be reduced to primitives before the interrupt call. And the audit_trail entry written by the node lives inside the partial state dict returned after interrupt(...), never before — placing the write earlier silently corrupts the trail every time the graph resumes.

Durable state with the Postgres checkpointer

Every ranking LangGraph HITL tutorial uses InMemorySaver. AP cannot. The realistic case is an invoice ingested Monday morning and an approver who responds Thursday afternoon, with worker restarts, deploys, and pod evictions in between. InMemorySaver loses state at the first restart, and the resume call from the approval UI fails because the checkpoint doesn't exist anymore. PostgresSaver makes the checkpoint durable across the time it takes a real human to look at a real invoice.

import os
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool

pool = ConnectionPool(
    conninfo=os.environ["DATABASE_URL"],
    max_size=20,
    kwargs={"autocommit": True, "prepare_threshold": 0},
)
checkpointer = PostgresSaver(pool)
checkpointer.setup()  # creates the checkpoint tables on first run

graph = builder.compile(checkpointer=checkpointer)

checkpointer.setup() runs the migration that creates the checkpoints, checkpoint_blobs, checkpoint_writes, and checkpoint_migrations tables. It is idempotent and safe to run at every deploy, though most teams call it once at provisioning and then leave it out of the steady-state path. The connection pool config follows the recommendations in the LangGraph Postgres docs: autocommit=True and prepare_threshold=0 because the checkpointer manages its own transactions and does not benefit from server-side prepared statements; letting the pool stay out of its way avoids subtle locking issues at high concurrency.

Wiring the checkpointer into builder.compile(checkpointer=checkpointer) is the only graph-level change. Every other line of the graph stays the same. From this point on, every node return — including the implicit checkpoint at the interrupt() call — persists to Postgres against the active thread_id. A worker restart between nodes is invisible to the graph: the next invocation against that thread_id reads the latest checkpoint and continues.

A few operational realities catch AP teams off guard if they aren't planned for up front.

Table growth is real. Every checkpoint write persists state to checkpoint_blobs, and a high-volume graph — 10,000 invoices a month at half a dozen checkpoints per invoice — accumulates gigabytes within a few months. Plan a retention policy from day one. A safe pattern is to delete checkpoints only for threads where the graph has reached END, leaving in-flight threads untouched:

DELETE FROM checkpoints
WHERE thread_id IN (
    SELECT thread_id
    FROM your_application_thread_log
    WHERE status = 'completed'
      AND finished_at < NOW() - INTERVAL '30 days'
);

Source the completed-thread list from whatever your application already tracks; the SQL above is the shape, not the literal table name. The critical invariant is that you never delete a checkpoint whose thread is still paused at an interrupt — the resume call needs that state to exist.

Schema migrations are application-level concerns. If you add or rename a field on APState, in-flight threads were checkpointed against the old shape and will return the old shape on resume. The defense is to read your own state dict defensively in node bodies for at least one approval-cycle's worth of time after the deploy — state.get("new_field") rather than state["new_field"]. For renamed fields, leave the old name in the schema as deprecated for a release before removing it. The Postgres checkpoint format is stable across LangGraph versions; the application schema isn't.

Alternative checkpointers exist for the non-production cases. InMemorySaver is fine for unit tests where the graph runs end-to-end inside a single process. SqliteSaver works for local development and for very small deployments where the entire AP volume fits on a single host's disk. Cloud-managed Postgres — RDS, Cloud SQL, Supabase — works identically to a self-hosted Postgres; only the DATABASE_URL string changes. For multi-region deployments, the checkpointer needs to point at a single primary; cross-region replication for AP throughput volumes is rarely worth the complexity it adds.

The idempotency rule: nodes restart on resume

The most expensive production gotcha in a LangGraph HITL workflow is the one that doesn't show up in tutorials. Per LangGraph's documented interrupt rules, when a LangGraph node hits interrupt() and the graph later resumes via Command(resume=...), the runtime restarts the entire node from the beginning rather than continuing from the line after the interrupt call, which means any side effects placed before the interrupt re-execute on every resume.

The implication for AP is direct. If human_approve_node sends a Slack message before calling interrupt(...), that Slack message gets re-sent every time the graph resumes — and for an invoice that gets redirected back to match_node for more info and then re-routes through human_approve again, the approver's Slack channel fills with duplicates. If the node writes an audit-trail entry before the interrupt, that entry gets re-written on every resume and the trail loses its integrity. If anything more consequential — an ERP post, a payment authorization, a notification to the vendor — sits before an interrupt anywhere in the graph, it fires every time the graph wakes up.

The BAD pattern, written out, looks innocent:

# BAD: side effect before interrupt re-fires on every resume
def bad_human_approve(state):
    send_slack_approval_request(state)   # re-sent on every resume
    decision = interrupt({...})
    return {"approval_decision": decision["choice"]}

On the initial run, Slack gets one message. On every resume thereafter, Slack gets another, because send_slack_approval_request runs every time the node body executes from the top — and the node body executes from the top on every resume. The fix is to move side effects after the interrupt, where they only execute once per resume rather than once per node restart:

# GOOD: side effects placed after the interrupt
def good_human_approve(state):
    decision = interrupt({...})
    write_audit_log(state, decision)     # only runs after a resume completes
    return {
        "approval_decision": decision["choice"],
        "approver_id": decision["approver_id"],
        "audit_trail": [
            {"step": "human_approve", "decision": decision}
        ],
    }

The initial Slack approval request is the awkward case, because it must be sent before the human can respond, but it cannot live inside human_approve_node for the reason above. The clean pattern is a separate side-effect node ahead of human_approve — call it notify_approver_node — that sends the Slack message and writes its own audit entry, then flows into human_approve for the interrupt. The notify node runs exactly once because it has no interrupt to restart against; the human-approve node sits on top of it and stays empty above the interrupt call.

def notify_approver_node(state: APState) -> dict:
    send_slack_approval_request(
        invoice=state["invoice_data"],
        po_match=state["po_match"],
    )
    return {"audit_trail": [{"step": "notify_approver"}]}

# in the graph wiring:
builder.add_node("notify_approver", notify_approver_node)
builder.add_conditional_edges(
    "match",
    routing_decision,
    {
        "auto_approve": "post_to_erp",
        "human_approve": "notify_approver",   # notify first
        "escalate": "escalate",
    },
)
builder.add_edge("notify_approver", "human_approve")

ERP posting needs its own layer of defense even though post_to_erp_node sits well after the interrupt. The risk there is different: the worker can crash after the ERP API call succeeds but before LangGraph checkpoints the node's return value. On restart, LangGraph re-runs the node — and the ERP gets a second post. The application-level fix is the idempotency key already wired into post_to_erp_node earlier: f"{vendor_id}:{invoice_number}" passed to the ERP API in the Idempotency-Key header. A modern ERP API hashes the key, recognizes the duplicate, and returns the original post's response without booking the invoice again. The retry becomes a no-op upstream rather than a duplicate liability on the books.

Audit-trail writes deserve their own treatment because their failure mode is silent. An audit log that double-writes is worse than no audit log because it destroys the trail's integrity for downstream SOX or SOC 2 review — a record that says the approval happened twice is harder to reconcile than a missing record. Write audit entries inside the partial state dict the node returns, never before the interrupt, never via a side effect that fires from the node body before the interrupt call. The audit_trail reducer (Annotated[list[dict], add]) handles the append; the node's only job is to return the new entry alongside the rest of its state update.

A fast test pattern surfaces idempotency violations cheaply. Drive the graph to an interrupt with an in-memory test harness; kill and restart the worker; resume with a stubbed approval; then assert that each side effect fired the expected number of times — exactly once for one-shot effects like the initial Slack message, exactly once per resume for post-interrupt effects, never for side effects that should be idempotency-keyed at the destination. Any effect that re-fires when it shouldn't is a bug the test will catch before production does.

Reusing extraction and validation as a subgraph

A single company often runs the same extract-and-validate pair from more than one document workflow. AP processes supplier invoices; an expense-report graph processes employee receipts; a sales-invoice ingestion graph processes customer billing. The first two nodes are the same primitive in all three. Copying extract_node and validate_node across three codebases means three places to update when the extraction prompt or validation rules change. Subgraphs solve this by letting the extract + validate pair live in one place and be invoked as a single node from each parent graph.

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END


class ExtractionState(TypedDict):
    invoice_url: str
    invoice_data: dict | None
    validation_errors: list[str]
    audit_trail: Annotated[list[dict], add]


extraction_builder = StateGraph(ExtractionState)
extraction_builder.add_node("extract", extract_node)
extraction_builder.add_node("validate", validate_node)
extraction_builder.add_edge(START, "extract")
extraction_builder.add_edge("extract", "validate")
extraction_builder.add_edge("validate", END)
extraction_subgraph = extraction_builder.compile()

The compiled extraction_subgraph is itself a graph. The parent AP graph treats it as a node, registers it with add_node, and wires edges around it as if it were a single function:

builder = StateGraph(APState)
builder.add_node("extract_and_validate", extraction_subgraph)
builder.add_node("match", match_node)
builder.add_node("notify_approver", notify_approver_node)
builder.add_node("human_approve", human_approve_node)
builder.add_node("post_to_erp", post_to_erp_node)
builder.add_node("escalate", escalate_node)

builder.add_edge(START, "extract_and_validate")
builder.add_edge("extract_and_validate", "match")
# ...remaining edges unchanged from the wiring shown earlier

State-schema interop is the part that makes or breaks the composition. The parent and subgraph share field names where they exchange data. APState carries invoice_url, invoice_data, validation_errors, and audit_trail; ExtractionState carries exactly those four fields with the same types. When the parent graph invokes the subgraph node, LangGraph maps the matching keys from APState into the subgraph's ExtractionState, runs the subgraph, and maps the subgraph's output back into the parent state. Fields the parent has and the subgraph doesn't (po_match, approver_id, erp_post_id) pass through untouched because the subgraph never reads or writes them.

When the field names don't line up — a sales-invoice graph that uses document_url rather than invoice_url, or an expense graph that wants receipt_data rather than invoice_data — there's a translation pattern that wraps the subgraph as a function:

def extraction_node_for_expense_graph(state: ExpenseState) -> dict:
    result = extraction_subgraph.invoke({
        "invoice_url": state["receipt_url"],
        "audit_trail": [],
    })
    return {
        "receipt_data": result["invoice_data"],
        "validation_errors": result["validation_errors"],
        "audit_trail": result["audit_trail"],
    }

That gives each parent its own field names without changing the subgraph. The subgraph stays pure; the translation logic lives in the parent, where it belongs.

The operational benefits stack up over time. The subgraph can be tested in isolation against extraction-only fixtures without spinning up the rest of the AP graph. It can be versioned independently — when the extraction prompt or validation rules change, every parent picks up the new behaviour the next time it runs without code changes in the parent. It can carry its own conditional edges if extraction-failure routing ever needs to live inside the subgraph rather than being handled by the parent. And it composes cleanly with checkpointing: subgraph nodes checkpoint to the same Postgres backing the parent, under the same thread_id, so durability and resumability work across the boundary without extra wiring.

LangSmith tracing, and when LangGraph is the wrong tool

LangSmith is the trace viewer most LangGraph teams reach for first, and the integration is small. Set two environment variables and the graph emits a trace per invocation:

LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=ls__...
LANGCHAIN_PROJECT=ap-invoice-graph

Each node call becomes a span. Each interrupt(...) is a visible pause in the trace timeline. Each resume continues the trace as a new run under the same thread_id, so the LangSmith UI shows the full multi-day path of an invoice from extraction through human approval to ERP post on a single timeline. For SOC 2 or SOX teams, the trace becomes a supplement to the audit_trail field on the state — audit_trail carries the structured record the application owns, LangSmith carries the underlying execution detail (inputs, outputs, model calls, latency) for the same flow.

The cost question is the one most AP teams need to model up front. LangSmith pricing is per trace, and a high-volume AP operation that processes tens of thousands of invoices a month will see that bill scale linearly. The standard compromise is sampling — trace 100% of failed and escalated invoices, sample 5-10% of clean auto-approves — implemented at the application layer by toggling LANGCHAIN_TRACING_V2 per invocation or by using the LangSmith SDK's per-run config. Self-hosted observability stacks (OpenTelemetry collectors with a trace backend like Tempo or Jaeger) are an option for teams whose compliance posture prefers data not leaving their boundary; the LangGraph trace exporter supports OTLP for that path.

The honest framing for the rest of this section is the cases where LangGraph isn't the right tool, and where the reader should go instead.

If the AP workflow is genuinely one call — upload, extract, return the structured data — LangGraph is overkill. A LangChain chain with structured output is simpler to write, simpler to deploy, and faster to maintain. The state graph, the checkpointer, the interrupt mechanics, the idempotency rule — none of it earns its complexity if the workflow doesn't actually branch or pause. The decision rule is whether the workflow has multi-step branching or a human gate that can take longer than the surrounding request timeout; if it doesn't, LangChain invoice extraction with structured output covers the simpler pattern.

If the team is committed to Claude as the underlying model and wants tool use and skills natively, the Claude Agent SDK is a more direct fit than LangGraph. It has a different control-flow model — closer to an agent loop than to an explicit graph — and the Skills feature handles reusable workflow blocks differently from LangGraph subgraphs. The Claude Agent SDK and Skills for AP automation article walks the AP build in that framework.

If the framework choice itself is still open — the reader is comparing LangGraph against OpenAI Agents SDK, Claude Agent SDK, and Pydantic AI — the right starting point is the framework-neutral overview that covers each framework's strengths and where they fit the AP shape. For teams that have already decided on the OpenAI stack and want to see the same AP build expressed with Agent, @function_tool, handoffs, and guardrails, build an AP automation agent with the OpenAI Agents SDK walks that variant end-to-end. The agentic invoice processing architecture patterns article holds that overview without committing to a framework. For teams whose AP need is narrower — a typed extraction agent rather than a multi-node approval workflow — the typed Pydantic AI agent for PDF invoice extraction walks the build with BinaryContent input, dependency injection, and self-healing validation.

Extract invoice data to Excel with natural language prompts

Upload your invoices, describe what you need in plain language, and download clean, structured spreadsheets. No templates, no complex configuration.

Exceptional accuracy on financial documents
1–8 seconds per page with parallel processing
50 free pages every month — no subscription
Any document layout, language, or scan quality
Native Excel types — numbers, dates, currencies
Files encrypted and auto-deleted within 24 hours
Continue Reading