LangGraph Quickstart

Master LangGraph, the framework for building stateful AI workflows. Learn to create directed graphs with typed state, streaming responses, and conversation memory.

You have retrieval functions, reranking logic, and generation code. Right now they're isolated Python functions you call sequentially. What happens when you need to:

  • Stream tokens to users as they're generated
  • Remember conversations across multiple turns
  • Handle errors gracefully when a component fails
  • Debug which step broke when things go wrong

LangGraph1 is the glue for building stateful, multi-step AI applications as directed graphs. Each node is a processing step, edges define the flow, and the runtime handles streaming, state management, and checkpointing.

What You'll Build

  • Orchestrate multi-node workflows with typed state
  • Implement streaming responses with StreamWriter
  • Add conversation memory with checkpointers
  • Inject runtime context for shared resources
  • Build a multi-step RAG pipeline

Why LangGraph?

LangGraph models your application as a state machine:

  1. State - A dataclass holding all data that flows through the workflow
  2. Nodes - Functions that read state, do work, and return updates
  3. Edges - Define which node runs next

This structure enables streaming, memory, and debugging out of the box.

Setup

Terminal
git clone https://github.com/mlexpertio/academy.git .
cd academy/context-engineering/langgraph-quickstart
uv sync

We'll use Ollama with the Gemma3 (4B)2 model:

Terminal
ollama pull gemma3:4b
py codecontext-engineering/langgraph-quickstart/common.py
from dataclasses import dataclass, field
from typing import Annotated
 
from langchain.chat_models import BaseChatModel, init_chat_model
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
 
 
@dataclass
class TokenEvent:
    token: str
 
 
def create_model() -> BaseChatModel:
    return init_chat_model(
        "gemma3:4b", model_provider="ollama", seed=42, temperature=0.0
    )

TokenEvent is a custom event type for streaming tokens. The create_model() helper initializes the LLM with consistent settings.

Build a Simple Graph

Let's build a minimal graph that can generate a response to a user's query.

Define the State

State is a dataclass that every node can read and write to. Define what your workflow needs:

py codecontext-engineering/langgraph-quickstart/common.py
@dataclass
class ConversationState:
    query: str = ""
    messages: Annotated[list[AnyMessage], add_messages] = field(default_factory=list)
    answer: str = ""

For our example we'll use these fields:

  • query - The user's input for this turn
  • messages - Conversation history with add_messages reducer (appends instead of replacing)
  • answer - The generated response

The Annotated[list[AnyMessage], add_messages] syntax tells LangGraph how to merge updates. Without it, returning {"messages": [new_msg]} would replace the entire list. With the reducer, it appends to the existing history, essential for multi-turn conversations.

Build a Node

A node is a function that takes state (and optionally other parameters) and returns a dictionary with the state updates:

py codecontext-engineering/langgraph-quickstart/simple_graph.py
def generate_node(state: ConversationState) -> dict:
    llm = create_model()
 
    messages = [
        SystemMessage(content="You are a helpful assistant. Be concise."),
        *state.messages,
        HumanMessage(content=state.query),
    ]
 
    response = llm.invoke(messages)
    answer = response.content
 
    return {
        "answer": answer,
        "messages": [
            HumanMessage(content=state.query),
            AIMessage(content=answer),
        ],
    }

The node reads state.query and state.messages, calls the LLM, and returns a dict with updates. Never mutate state directly, return only the fields that changed.

Connect Nodes with Edges

On it's own, the node is just a function. To enable the "good stuff" from LangGraph, we need to connect it to the rest of the graph (a very simple one in this case):

py codecontext-engineering/langgraph-quickstart/simple_graph.py
workflow = StateGraph(ConversationState)
workflow.add_node("generate", generate_node)
 
workflow.set_entry_point("generate")
workflow.add_edge("generate", END)
 
graph = workflow.compile()

StateGraph(ConversationState) creates a workflow using the state schema. add_node() registers the function, set_entry_point() marks where execution starts, and add_edge() defines the flow. compile() produces the executable graph.

Let's run it:

py codecontext-engineering/langgraph-quickstart/simple_graph.py
result = graph.invoke(ConversationState(query="What is the capital of Bulgaria?"))
print(result["answer"].strip())
Output
Sofia.

Add Streaming

Up until now, your graph is returning the entire response once it is generated. But your users expect faster responses, at least ones that are perceived as such. LangGraph provides StreamWriter to emit events mid-execution:

py codecontext-engineering/langgraph-quickstart/streaming.py
def generate_with_streaming(state: ConversationState, writer: StreamWriter) -> dict:
    llm = create_model()
 
    messages = [
        SystemMessage(content="You are a helpful assistant. Be concise."),
        *state.messages,
        HumanMessage(content=state.query),
    ]
 
    answer = ""
    for chunk in llm.stream(messages):
        if chunk.content:
            answer += chunk.content
            writer(TokenEvent(token=chunk.content))
 
    return {
        "answer": answer,
        "messages": [
            HumanMessage(content=state.query),
            AIMessage(content=answer),
        ],
    }
 
 
workflow = StateGraph(ConversationState)
workflow.add_node("generate", generate_with_streaming)
workflow.set_entry_point("generate")
workflow.add_edge("generate", END)
 
graph = workflow.compile()

The node now accepts a StreamWriter. Each time the LLM produces a chunk, writer(TokenEvent(...)) emits it immediately. Consumers receive these events in real-time while the node continues executing.

To receive streaming events, use stream_mode="custom" when invoking the graph:

py codecontext-engineering/langgraph-quickstart/streaming.py
for event in graph.stream(
    ConversationState(query="Explain which country has the hottest beaches?"),
    stream_mode="custom",
):
    if isinstance(event, TokenEvent):
        print(event.token, end="", flush=True)
Output
It's subjective, but **Maldives** consistently ranks as having the hottest beaches due to its consistently high temperatures and clear, warm waters.

When running the code, you can see the tokens stream in real-time.

Add Conversation Memory

LLMs are stateless, they don't remember any of the previous messages unless you send the full history. LangGraph's checkpointer gives memory out of the box:

py codecontext-engineering/langgraph-quickstart/memory.py
workflow = StateGraph(ConversationState)
workflow.add_node("generate", generate_node)
workflow.set_entry_point("generate")
workflow.add_edge("generate", END)
 
graph = workflow.compile(checkpointer=MemorySaver())

MemorySaver() stores state in memory between invocations. For production, use PostgresSaver3 for persistent storage across restarts.

Pass a thread_id to isolate conversation state per user or session:

py codecontext-engineering/langgraph-quickstart/memory.py
config = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke(
    ConversationState(query="My name is Alice."),
    config=config,
)
print(f"Assistant: {result1['answer']}")
Output
Assistant: Okay, Alice. How can I help you today?
py codecontext-engineering/langgraph-quickstart/memory.py
result2 = graph.invoke(
    ConversationState(query="What's my name?"),
    config=config,
)
print(f"Assistant: {result2['answer']}")
Output
Assistant: Your name is Alice.

The second query received the full message history. The checkpointer stored state after the first call and loaded it for the second. No manual bookkeeping required.

Inject Runtime Context

Often, you have resources that are expensive to create and should be reused across your graph. For example, an LLM client or a database connection. LangGraph provides Runtime context to inject these resources into the graph:

py codecontext-engineering/langgraph-quickstart/runtime_context.py
@dataclass
class AppContext:
    llm: BaseChatModel
 
 
def generate_node(state: ConversationState, runtime: Runtime[AppContext]) -> dict:
    llm = runtime.context.llm
 
    messages = [
        SystemMessage(content="You are a helpful assistant. Be concise."),
        *state.messages,
        HumanMessage(content=state.query),
    ]
 
    response = llm.invoke(messages)
    answer = response.content
 
    return {
        "answer": answer,
        "messages": [
            HumanMessage(content=state.query),
            AIMessage(content=answer),
        ],
    }
 
 
workflow = StateGraph(ConversationState, AppContext)
workflow.add_node("generate", generate_node)
workflow.set_entry_point("generate")
workflow.add_edge("generate", END)
 
graph = workflow.compile(checkpointer=MemorySaver())

StateGraph(ConversationState, AppContext) accepts a second parameter for runtime context. Nodes can access it via the runtime parameter.

Let's invoke the graph with the context:

py codecontext-engineering/langgraph-quickstart/runtime_context.py
context = AppContext(llm=create_model())
 
result = graph.invoke(
    ConversationState(query="What is 2 + 2?"),
    config={"configurable": {"thread_id": "math-session"}},
    context=context,
)
print(result["answer"])
Output
4

The AppContext dataclass is created once and reused across all invocations.

Pass Runtime Config

Some parameters vary per request but don't belong in state (user preferences, filters, feature flags). Use RunnableConfig:

py codecontext-engineering/langgraph-quickstart/runtime_config.py
@dataclass
class AppContext:
    llm: BaseChatModel
 
 
def generate_node(
    state: ConversationState,
    runtime: Runtime[AppContext],
    config: RunnableConfig,
) -> dict:
    style = config["configurable"].get("style", "formal")
    system_prompt = (
        "You are a helpful assistant. Be concise and formal."
        if style == "formal"
        else "You are a friendly assistant. Be casual and fun!"
    )
 
    llm = runtime.context.llm
    messages = [
        SystemMessage(content=system_prompt),
        *state.messages,
        HumanMessage(content=state.query),
    ]
 
    response = llm.invoke(messages)
    return {
        "answer": response.content,
        "messages": [
            HumanMessage(content=state.query),
            AIMessage(content=response.content),
        ],
    }
 
 
workflow = StateGraph(ConversationState, AppContext)
workflow.add_node("generate", generate_node)
workflow.set_entry_point("generate")
workflow.add_edge("generate", END)
 
graph = workflow.compile()

The config: RunnableConfig parameter provides per-request settings. Pass custom values via config={"configurable": {...}} when invoking the graph. Use this for parameters that vary per request (user preferences, filters) but don't belong in state.

py codecontext-engineering/langgraph-quickstart/runtime_config.py
context = AppContext(llm=create_model())
 
result1 = graph.invoke(
    ConversationState(query="Hello!"),
    config={"configurable": {"thread_id": "formal", "style": "formal"}},
    context=context,
)
print(f"Formal: {result1['answer']}")
Output
Formal: How may I assist you today?

Let's change the style to casual and see the difference:

py codecontext-engineering/langgraph-quickstart/runtime_config.py
result2 = graph.invoke(
    ConversationState(query="Hello!"),
    config={"configurable": {"thread_id": "casual", "style": "casual"}},
    context=context,
)
print(f"Casual: {result2['answer']}")
Output
Casual: Hey there! 😄 What's up? How's your day going so far? Let's chat! ✨

Note that we also changed the thread_id to isolate the conversation.

Build a Multi-Node Pipeline

Now let's build a very simple RAG pipeline. It will combine everything we've learned so far. This is a stepping stone for the upcoming FinVault capstone:

py codecontext-engineering/langgraph-quickstart/multi_node.py
@dataclass
class RAGState:
    query: str = ""
    messages: Annotated[list[AnyMessage], add_messages] = field(default_factory=list)
    retrieved_chunks: list[str] = field(default_factory=list)
    context: str = ""
    answer: str = ""
 
 
@dataclass
class RAGContext:
    llm: BaseChatModel
 
 
def retrieve_node(state: RAGState) -> dict:
    chunks = [
        "Python was created by Baba Ginka in 1981.",
        "Python emphasizes comment readability and verbosity.",
        "Python supports just one programming paradigm - functional programming.",
    ]
    return {"retrieved_chunks": chunks}
 
 
def format_context_node(state: RAGState) -> dict:
    context = "\n".join(f"- {chunk}" for chunk in state.retrieved_chunks)
    return {"context": context}
 
 
def generate_node(
    state: RAGState, runtime: Runtime[RAGContext], writer: StreamWriter
) -> dict:
    llm = runtime.context.llm
 
    system = "Answer based only on the provided context. Be concise."
    prompt = f"Context:\n{state.context}\n\nQuestion: {state.query}"
 
    messages = [
        SystemMessage(content=system),
        *state.messages,
        HumanMessage(content=prompt),
    ]
 
    answer = ""
    for chunk in llm.stream(messages):
        if chunk.content:
            answer += chunk.content
            writer(TokenEvent(token=chunk.content))
 
    return {
        "answer": answer,
        "messages": [
            HumanMessage(content=state.query),
            AIMessage(content=answer),
        ],
    }
 
 
workflow = StateGraph(RAGState, RAGContext)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("format_context", format_context_node)
workflow.add_node("generate", generate_node)
 
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "format_context")
workflow.add_edge("format_context", "generate")
workflow.add_edge("generate", END)
 
graph = workflow.compile(checkpointer=MemorySaver())

Three nodes execute in sequence:

  • retrieve fetches chunks (simulated here)
  • format_context structures them for the system_prompt
  • generate produces the answer with streaming

Each node updates specific state fields, and the framework handles data flow between them. Let's test the pipeline:

py codecontext-engineering/langgraph-quickstart/multi_node.py
context = RAGContext(llm=create_model())
 
print("Question: Who created Python?")
print("Answer: ", end="")
 
for event in graph.stream(
    RAGState(query="Who created Python?"),
    config={"configurable": {"thread_id": "python-qa"}},
    context=context,
    stream_mode="custom",
):
    if isinstance(event, TokenEvent):
        print(event.token, end="", flush=True)
 
print()
Output
Question: Who created Python?
Answer: Baba Ginka

This three-node pipeline is the RAG foundation. In the capstone, you'll replace simulated retrieval with a real one.

Async Streaming

Web APIs require async generators, so you don't block the main thread. LangGraph supports this with astream:

py codecontext-engineering/langgraph-quickstart/async_streaming.py
async def generate_node(state: ConversationState):
    llm = create_model()
 
    messages = [
        SystemMessage(content="You are a helpful assistant. Be concise."),
        *state.messages,
        HumanMessage(content=state.query),
    ]
 
    response = await llm.ainvoke(messages)
    return {"messages": [response]}
 
 
async def main():
    workflow = StateGraph(ConversationState)
    workflow.add_node("generate", generate_node)
    workflow.set_entry_point("generate")
    workflow.add_edge("generate", END)
    graph = workflow.compile()
 
    async for message, _ in graph.astream(
        ConversationState(
            query="Who made Python? What is the single most important purpose of Python?"
        ),
        stream_mode="messages",
    ):
        if isinstance(message, AIMessage):
            print(message.content, end="", flush=True)
    print()
 
 
asyncio.run(main())
Output
Python was created by **Guido van Rossum** at CWI in the Netherlands.
 
The single most important purpose of Python is its **readability and versatility**, making it a popular choice for a wide range of applications, including web development, data science, and scripting.

Use async def for node functions and graph.astream() to stream results asynchronously. The stream_mode="messages" yields LangChain message objects as they're produced. This pattern integrates directly with FastAPI's StreamingResponse for Server-Sent Events (SSE).

Loading...

Next Steps

LangGraph lets you compose AI applications as directed graphs. This allows you to build complex workflows and orchestrate them with ease. You now understand:

  • State - Typed dataclasses that flow through your graph
  • Nodes - Functions that transform state
  • Edges - Define execution order
  • Streaming - Emit events mid-execution with StreamWriter
  • Memory - Persist conversations with checkpointers
  • Context - Inject shared resources like LLMs and databases
  • Config - Pass per-request parameters

In the capstone project, you'll apply these concepts to build FinVault, a production RAG system with:

  • A LangGraph workflow orchestrating retrieval, reranking, and generation
  • FastAPI endpoints with SSE streaming
  • A Streamlit UI consuming the stream
  • Background task processing for document ingestion

References

Footnotes

  1. LangGraph by LangChain ↩

  2. Gemma3 (4B) model ↩

  3. PostgresSaver ↩