Skip to main content

Streaming

Streaming is crucial for enhancing the responsiveness of applications built on LLMs. By displaying output progressively, even before a complete response is ready, streaming significantly improves user experience (UX), particularly when dealing with the latency of LLMs.

Overview​

Generating full responses from LLMs often incurs a delay of several seconds, which becomes more noticeable in complex applications with multiple model calls. Fortunately, LLMs generate responses iteratively, allowing for intermediate results to be displayed as they are produced. By streaming these intermediate outputs, LangChain enables smoother UX in LLM-powered apps and offers built-in support for streaming at the core of its design.

In this guide, we'll discuss streaming in LLM applications and explore how LangChain's streaming APIs facilitate real-time output from various components in your application.

What to stream in LLM applications​

In applications involving LLMs, several types of data can be streamed to improve user experience by reducing perceived latency and increasing transparency. These include:

1. Streaming LLM outputs​

The most common and critical data to stream is the output generated by the LLM itself. LLMs often take time to generate full responses, and by streaming the output in real-time, users can see partial results as they are produced. This provides immediate feedback and helps reduce the wait time for users.

2. Streaming pipeline or workflow progress​

Beyond just streaming LLM output, it’s useful to stream progress through more complex workflows or pipelines, giving users a sense of how the application is progressing overall. This could include:

  • In LangGraph Workflows: With LangGraph, workflows are composed of nodes and edges that represent various steps. Streaming here involves tracking changes to the graph state as individual nodes request updates. This allows for more granular monitoring of which node in the workflow is currently active, giving real-time updates about the status of the workflow as it progresses through different stages.

  • In LCEL Pipelines: Streaming updates from an LCEL pipeline involves capturing progress from individual sub-runnables. For example, as different steps or components of the pipeline execute, you can stream which sub-runnable is currently running, providing real-time insight into the overall pipeline's progress.

Streaming pipeline or workflow progress is essential in providing users with a clear picture of where the application is in the execution process.

3. Streaming custom data​

In some cases, you may need to stream custom data that goes beyond the information provided by the pipeline or workflow structure. This custom information is injected within a specific step in the workflow, whether that step is a tool or a LangGraph node. For example, you could stream updates about what a tool is doing in real-time or the progress through a LangGraph node. This granular data, which is emitted directly from within the step, provides more detailed insights into the execution of the workflow and is especially useful in complex processes where more visibility is needed.

Streaming APIs​

LangChain two main APIs for streaming output in real-time. These APIs are supported by any component that implements the Runnable Interface, including LLMs, compiled LangGraph graphs, and any Runnable generated with LCEL.

  1. sync stream and async astream: Use to stream outputs from individual Runnables (e.g., a chat model) as they are generated or stream any workflow created with LangGraph.
  2. The async only astream_events: Use this API to get access to custom events and intermediate outputs from LLM applications built entirely with LCEL. Note that this API is available, but not needed when working with LangGraph.
note

In addition, there is a legacy async astream_log API. This API is not recommended for new projects it is more complex and less feature-rich than the other streaming APIs.

stream() and astream()​

The stream() method returns an iterator that yields chunks of output synchronously as they are produced. You can use a for loop to process each chunk in real-time. For example, when using an LLM, this allows the output to be streamed incrementally as it is generated, reducing the wait time for users.

The type of chunk yielded by the stream() and astream() methods depends on the component being streamed. For example, when streaming from an LLM each component will be an AIMessageChunk; however, for other components, the chunk may be different.

The stream() method returns an iterator that yields these chunks as they are produced. For example,

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

The asynchronous version, astream(), works similarly but is designed for non-blocking workflows. You can use it in asynchronous code to achieve the same real-time streaming behavior.

Usage with chat models​

When using stream() or astream() with chat models, the output is streamed as AIMessageChunks as it is generated by the LLM. This allows you to present or process the LLM's output incrementally as it's being produced, which is particularly useful in interactive applications or interfaces.

Usage with LangGraph​

LangGraph compiled graphs are Runnables and support the standard streaming APIs.

When using the stream and astream methods with LangGraph, you can one or more streaming mode which allow you to control the type of output that is streamed. The available streaming modes are:

  • "values": Emit all values of the state for each step.
  • "updates": Emit only the node name(s) and updates that were returned by the node(s) after each step.
  • "debug": Emit debug events for each step.
  • "messages": Emit LLM messages token-by-token.
  • "custom": Emit custom output witten using LangGraph's StreamWriter.

For more information, please see:

Usage with LCEL​

If you compose multiple Runnables using LangChain’s Expression Language (LCEL), the stream() and astream() methods will, by convention, stream the output of the last step in the chain. This allows the final processed result to be streamed incrementally. LCEL tries to optimize streaming latency in pipelines such that the streaming results from the last step are available as soon as possible.

astream_events​

tip

Use the astream_events API to access custom data and intermediate outputs from LLM applications built entirely with LCEL.

While this API is available for use with LangGraph as well, it is usually not necessary when working with LangGraph, as the stream and astream methods provide comprehensive streaming capabilities for LangGraph graphs.

For chains constructed using LCEL, the .stream() method only streams the output of the final step from te chain. This might be sufficient for some applications, but as you build more complex chains of several LLM calls together, you may want to use the intermediate values of the chain alongside the final output. For example, you may want to return sources alongside the final generation when building a chat-over-documents app.

There are ways to do this using callbacks, or by constructing your chain in such a way that it passes intermediate values to the end with something like chained .assign() calls, but LangChain also includes an .astream_events() method that combines the flexibility of callbacks with the ergonomics of .stream(). When called, it returns an iterator which yields various types of events that you can filter and process according to the needs of your project.

Here's one small example that prints just events containing streamed chat model output:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

You can roughly think of it as an iterator over callback events (though the format differs) - and you can use it on almost all LangChain components!

See this guide for more detailed information on how to use .astream_events(), including a table listing available events.

Writing custom data to the stream​

To write custom data to the stream, you will need to choose one of the following methods based on the component you are working with:

  1. LangGraph's StreamWriter can be used to write custom data that will surface through stream and astream APIs when working with LangGraph. Important this is a LangGraph feature, so it is not available when working with pure LCEL. See how to streaming custom data for more information.
  2. dispatch_events / adispatch_events can be used to write custom data that will be surfaced through the astream_events API. See how to dispatch custom callback events for more information.

"Auto-Streaming" Chat Models​

LangChain simplifies streaming from chat models by automatically enabling streaming mode in certain cases, even when you’re not explicitly calling the streaming methods. This is particularly useful when you use the non-streaming invoke method but still want to stream the entire application, including intermediate results from the chat model.

How It Works​

When you call the invoke (or ainvoke) method on a chat model, LangChain will automatically switch to streaming mode if it detects that you are trying to stream the overall application.

Under the hood, it'll have invoke (or ainvoke) use the stream (or astream) method to generate its output. The result of the invocation will be the same as far as the code that was using invoke is concerned; however, while the chat model is being streamed, LangChain will take care of invoking on_llm_new_token events in LangChain's callback system. These callback events allow LangGraph stream/astream and astream_events to surface the chat model's output in real-time.

Example:

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

Async Programming​

LangChain offers both synchronous (sync) and asynchronous (async) versions of many of its methods. The async methods are typically prefixed with an "a" (e.g., ainvoke, astream). When writing async code, it's crucial to consistently use these asynchronous methods to ensure non-blocking behavior and optimal performance.

If streaming data fails to appear in real-time, please ensure that you are using the correct async methods for your workflow.

Please review the async programming in LangChain guide for more information on writing async code with LangChain.

Please see the following how-to guides for specific examples of streaming in LangChain:

For writing custom data to the stream, please see the following resources:


Was this page helpful?