流式处理
对于构建在 LLM 之上的应用程序来说,流式处理对于增强响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流式处理也能显著改善用户体验 (UX),尤其是在处理 LLM 的延迟时。
概述
从 LLM 生成完整响应通常会产生几秒钟的延迟,这在具有多个模型调用的复杂应用程序中变得更加明显。幸运的是,LLM 迭代地生成响应,允许在生成中间结果时显示它们。通过流式传输这些中间输出,LangChain 可以在 LLM 驱动的应用程序中实现更流畅的 UX,并在其设计的核心提供对流式处理的内置支持。
在本指南中,我们将讨论 LLM 应用程序中的流式处理,并探讨 LangChain 的流式处理 API 如何促进应用程序中各种组件的实时输出。
LLM 应用程序中要流式处理什么
在涉及 LLM 的应用程序中,可以流式传输几种类型的数据,以通过减少感知延迟和增加透明度来改善用户体验。这些包括
1. 流式处理 LLM 输出
最常见和最关键的流式处理数据是 LLM 本身生成的输出。LLM 通常需要时间来生成完整响应,通过实时流式传输输出,用户可以在生成部分结果时看到它们。这提供了即时反馈,并有助于减少用户的等待时间。
2. 流式处理管道或工作流程进度
除了仅流式传输 LLM 输出之外,流式传输更复杂的工作流程或管道的进度也很有用,这使用户可以了解应用程序的整体进度。这可能包括
在 LangGraph 工作流程中: 使用 LangGraph,工作流程由代表各个步骤的节点和边组成。此处的流式处理涉及跟踪 图状态 的更改,因为各个 节点 请求更新。这可以更精细地监控工作流程中当前处于活动状态的节点,从而实时更新工作流程在不同阶段的进展状态。
在 LCEL 管道中: 从 LCEL 管道流式传输更新涉及捕获各个 子 runnable 的进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子 runnable,从而提供对整个管道进度的实时洞察。
流式处理管道或工作流程进度对于向用户清晰地展示应用程序在执行过程中的位置至关重要。
3. 流式处理自定义数据
在某些情况下,您可能需要流式传输超出管道或工作流程结构提供的信息的 自定义数据。此自定义信息注入在工作流程中的特定步骤中,无论该步骤是工具还是 LangGraph 节点。例如,您可以实时流式传输有关工具正在执行的操作的更新或 LangGraph 节点的进度。这种粒度数据直接从步骤内部发出,提供对工作流程执行的更详细的洞察,并且在需要更高可见性的复杂流程中尤其有用。
流式处理 API
LangChain 提供了两个主要的 API 用于实时流式传输输出。这些 API 由任何实现 Runnable 接口 的组件支持,包括 LLM、编译后的 LangGraph 图,以及使用 LCEL 生成的任何 Runnable。
stream
:用于流式传输来自各个 Runnable(例如,聊天模型)的输出,因为它们是生成的,或流式传输使用 LangGraph 创建的任何工作流程。streamEvents
:使用此 API 来访问完全使用 LCEL 构建的 LLM 应用程序的自定义事件和中间输出。请注意,此 API 是可用的,但在使用 LangGraph 时不需要。
此外,还有一个 旧版 streamLog API。不建议新项目使用此 API,它比其他流式处理 API 更复杂且功能更少。
stream()
stream()
方法返回一个迭代器,该迭代器在生成时同步地生成输出块。您可以使用 for await
循环来实时处理每个块。例如,当使用 LLM 时,这允许输出在生成时以增量方式流式传输,从而减少用户的等待时间。
stream()
方法生成的块类型取决于正在流式传输的组件。例如,当从 LLM 流式传输时,每个组件都将是一个 AIMessageChunk
;但是,对于其他组件,块可能有所不同。
stream()
方法返回一个迭代器,该迭代器在生成时生成这些块。例如,
for await (const chunk of await component.stream(someInput)) {
// IMPORTANT: Keep the processing of each chunk as efficient as possible.
// While you're processing the current chunk, the upstream component is
// waiting to produce the next one. For example, if working with LangGraph,
// graph execution is paused while the current chunk is being processed.
// In extreme cases, this could even result in timeouts (e.g., when llm outputs are
// streamed from an API that has a timeout).
console.log(chunk);
}
与聊天模型一起使用
当将 stream()
与聊天模型一起使用时,输出将以 AIMessageChunks
的形式流式传输,因为它是由 LLM 生成的。这允许您在 LLM 的输出生成时以增量方式呈现或处理它,这在交互式应用程序或界面中特别有用。
与 LangGraph 一起使用
LangGraph 编译图是 Runnables,并支持标准流式处理 API。
当将 *stream* 和方法与 LangGraph 一起使用时,您可以选择 一个或多个 流式处理模式,这些模式允许您控制流式传输的输出类型。可用的流式处理模式是
- "values":为每个步骤发出 状态 的所有值。
- "updates":在每个步骤之后,仅发出节点名称和节点返回的更新。
- "debug":为每个步骤发出调试事件。
- "messages":发出 LLM 消息 逐个 token。
有关更多信息,请参阅
- LangGraph 流式处理概念指南,以获取有关在使用 LangGraph 时如何流式传输的更多信息。
- LangGraph 流式处理操作指南,以获取 LangGraph 中流式处理的具体示例。
与 LCEL 一起使用
如果您使用 LangChain 的表达式语言 (LCEL) 组合多个 Runnables,则按照约定,stream()
方法将流式传输链中最后一步的输出。这允许最终处理的结果以增量方式流式传输。LCEL 尝试优化管道中的流式处理延迟,以便尽快获得最后一步的流式处理结果。
streamEvents
对于使用 LCEL 构建的链,.stream()
方法仅流式传输链中最后一步的输出。这对于某些应用程序可能就足够了,但是当您构建更复杂的由多个 LLM 调用组成的链时,您可能希望将链的中间值与最终输出一起使用。例如,在构建基于文档的聊天应用程序时,您可能希望返回来源以及最终生成的结果。
有一些方法可以 使用回调 来做到这一点,或者通过以某种方式构建您的链,使其使用类似链式 .assign()
调用的方式将中间值传递到末尾,但 LangChain 也包含一个 .streamEvents()
方法,该方法结合了回调的灵活性和 .stream()
的人体工程学。调用时,它返回一个迭代器,该迭代器生成 各种类型的事件,您可以根据项目的需要对其进行过滤和处理。
这是一个小示例,仅打印包含流式聊天模型输出的事件
import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { ChatAnthropic } from "@langchain/anthropic";
const model = new ChatAnthropic({ model: "claude-3-sonnet-20240229" });
const prompt = ChatPromptTemplate.fromTemplate("tell me a joke about {topic}");
const parser = StringOutputParser();
const chain = prompt.pipe(model).pipe(parser);
for await (const event of await chain.streamEvents(
{ topic: "parrot" },
{ version: "v2" }
)) {
if (event.event === "on_chat_model_stream") {
console.log(event);
}
}
您可以大致将其视为回调事件的迭代器(尽管格式有所不同) - 您几乎可以在所有 LangChain 组件上使用它!
有关如何使用 .streamEvents()
的更详细信息,包括列出可用事件的表格,请参阅 本指南。
将自定义数据写入流
要将自定义数据写入流,您需要根据您正在使用的组件选择以下方法之一
- dispatch_events 可用于写入自定义数据,这些数据将通过 streamEvents API 公开。有关更多信息,请参阅 如何分派自定义回调事件。
“自动流式处理”聊天模型
LangChain 通过在某些情况下自动启用流式处理模式来简化从 聊天模型 的流式处理,即使您没有显式调用流式处理方法。当您使用非流式处理的 invoke
方法但仍希望流式处理整个应用程序(包括来自聊天模型的中间结果)时,这尤其有用。
工作原理
当您在聊天模型上调用 invoke
方法时,如果 LangChain 检测到您正在尝试流式处理整个应用程序,它将自动切换到流式处理模式。
在幕后,它将让 invoke
使用 stream
方法来生成其输出。调用的结果与使用 invoke
的代码所关心的结果相同;但是,在流式传输聊天模型时,LangChain 将负责在 LangChain 的 回调系统 中调用 on_llm_new_token
事件。这些回调事件允许 LangGraph stream
和 streamEvents
实时显示聊天模型的输出。
示例
const node = (state) => {
...
// The code below uses the invoke method, but LangChain will
// automatically switch to streaming mode
// when it detects that the overall
// application is being streamed.
ai_message = model.invoke(state["messages"])
...
for await (const chunk of await compiledGraph.stream(..., { streamMode: "messages" })) {
// ... do something
}
}
相关资源
请参阅以下操作指南,以获取 LangChain 中流式处理的具体示例
- LangGraph 流式处理概念指南
- LangGraph 流式处理操作指南
- 如何流式传输 runnables:本操作指南介绍了 LangChain 组件(例如,聊天模型)和 LCEL 的常见流式处理模式。
- 如何流式传输聊天模型
- 如何流式传输工具调用
有关将自定义数据写入流的信息,请参阅以下资源
- 如果使用 LCEL,请参阅 如何分派自定义回调事件。