跳到主要内容

如何进行流式处理

先决条件

本指南假设您熟悉以下概念

流式处理对于使基于 LLM 的应用程序对最终用户感觉响应迅速至关重要。

重要的 LangChain 原语,如 LLM、解析器、提示、检索器和代理,都实现了 LangChain Runnable 接口。

此接口提供了两种通用的流式传输内容的方法

  • .stream():流式处理的默认实现,用于流式传输链的最终输出。
  • streamEvents()streamLog():这些方法提供了一种流式传输链的中间步骤和最终输出的方式。

让我们来看看这两种方法!

有关 LangChain 中流式处理技术的高级概述,请参阅概念指南的这一部分

使用 Stream

所有 Runnable 对象都实现了一个名为 stream 的方法。

这些方法旨在以块的形式流式传输最终输出,并在每个块可用时立即生成它。

只有当程序中的所有步骤都知道如何处理输入流时,流式处理才有可能;即,一次处理一个输入块,并生成相应的输出块。

此处理的复杂性可能有所不同,从发出 LLM 生成的令牌等简单任务,到在整个 JSON 完成之前流式传输 JSON 结果部分等更具挑战性的任务。

开始探索流式处理的最佳位置是 LLM 应用中最重要组件——模型本身!

LLM 和聊天模型

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远慢于~200-300 毫秒的阈值,在该阈值下,应用程序对最终用户感觉响应迅速。

使应用程序感觉更具响应性的关键策略是显示中间进度;例如,逐个令牌地流式传输模型的输出。

import "dotenv/config";

选择您的聊天模型

安装依赖项

提示

请参阅 此部分,了解有关安装集成包的通用说明.

yarn add @langchain/groq 

添加环境变量

GROQ_API_KEY=your-api-key

实例化模型

import { ChatGroq } from "@langchain/groq";

const model = new ChatGroq({
model: "llama-3.3-70b-versatile",
temperature: 0
});
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
console.log(`${chunk.content}|`);
}
|
Hello|
!|
I'm|
a|
large|
language|
model|
developed|
by|
Open|
AI|
called|
GPT|
-|
4|
,|
based|
on|
the|
Gener|
ative|
Pre|
-trained|
Transformer|
architecture|
.|
I'm|
designed|
to|
understand|
and|
generate|
human|
-like|
text|
based|
on|
the|
input|
I|
receive|
.|
My|
primary|
function|
is|
to|
assist|
with|
answering|
questions|
,|
providing|
information|
,|
and|
engaging|
in|
various|
types|
of|
conversations|
.|
While|
I|
don't|
have|
personal|
experiences|
or|
emotions|
,|
I'm|
trained|
on|
diverse|
datasets|
that|
enable|
me|
to|
provide|
useful|
and|
relevant|
information|
across|
a|
wide|
array|
of|
topics|
.|
How|
can|
I|
assist|
you|
today|
?|
|
|

让我们看一下原始块中的一个

chunks[0];
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: '',
tool_call_chunks: [],
additional_kwargs: {},
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}

我们得到了一些名为 AIMessageChunk 的东西。此块表示 AIMessage 的一部分。

消息块在设计上是累加的——可以使用 .concat() 方法简单地将它们加起来,以获得到目前为止的响应状态!

let finalChunk = chunks[0];

for (const chunk of chunks.slice(1, 5)) {
finalChunk = finalChunk.concat(chunk);
}

finalChunk;
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Hello! I'm a",
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_call_chunks: [],
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: []
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: "Hello! I'm a",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}

几乎所有 LLM 应用程序都涉及比仅调用语言模型更多的步骤。

让我们使用 LangChain 表达式语言 (LCEL) 构建一个简单的链,该链组合了提示、模型和解析器,并验证流式处理是否有效。

我们将使用 StringOutputParser 来解析模型的输出。这是一个简单的解析器,它从 AIMessageChunk 中提取内容字段,从而为我们提供模型返回的 token

提示

LCEL 是一种声明式方式,通过链接不同的 LangChain 原语来指定“程序”。使用 LCEL 创建的链受益于 stream 的自动实现,从而可以流式传输最终输出。实际上,使用 LCEL 创建的链实现了整个标准 Runnable 接口。

import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}");

const parser = new StringOutputParser();

const chain = prompt.pipe(model).pipe(parser);

const stream = await chain.stream({
topic: "parrot",
});

for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
Sure|
,|
here's|
a|
joke|
for|
you|
:

|
Why|
did|
the|
par|
rot|
sit|
on|
the|
stick|
?

|
Because|
it|
wanted|
to|
be|
a|
"|
pol|
ly|
-stick|
-al|
"|
observer|
!|
|
|
注意

您不必使用 LangChain 表达式语言 来使用 LangChain,而是可以依靠标准的命令式编程方法,通过单独调用每个组件上的 invokebatchstream,将结果分配给变量,然后在下游按您的意愿使用它们。

如果这满足您的需求,那么我们对此表示赞赏 👌!

处理输入流

如果您想从输出中流式传输 JSON,该怎么办?

如果您要依靠 JSON.parse 来解析部分 json,则解析将失败,因为部分 json 不是有效的 json。

您可能会完全不知所措,并声称不可能流式传输 JSON。

好吧,事实证明有一种方法可以做到——解析器需要对输入流进行操作,并尝试将部分 json“自动完成”为有效状态。

让我们看看这样的解析器是如何工作的,以了解这意味着什么。

import { JsonOutputParser } from "@langchain/core/output_parsers";

const chain = model.pipe(new JsonOutputParser());
const stream = await chain.stream(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
{
countries: [
{ name: 'France', population: 67390000 },
{ name: 'Spain', population: 47350000 },
{ name: 'Japan', population: 125800000 }
]
}

现在,让我们破坏流式处理。我们将使用之前的示例,并在末尾附加一个提取函数,该函数从最终的 JSON 中提取国家/地区名称。由于这个新的最后一步只是一个没有定义流式处理行为的函数调用,因此来自先前步骤的流式处理输出被聚合,然后作为单个输入传递给该函数。

危险

链中任何对最终输入而不是对输入流进行操作的步骤都可能通过 stream 破坏流式处理功能。

提示

稍后,我们将讨论 streamEvents API,它流式传输来自中间步骤的结果。即使链包含仅对最终输入进行操作的步骤,此 API 也会流式传输来自中间步骤的结果。

// A function that operates on finalized inputs
// rather than on an input_stream

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]

非流式处理组件

与上面的示例类似,某些内置组件(如检索器)不提供任何流式处理。如果我们尝试 stream 它们会发生什么?

import { OpenAIEmbeddings } from "@langchain/openai";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const template = `Answer the question based only on the following context:
{context}

Question: {question}
`;
const prompt = ChatPromptTemplate.fromTemplate(template);

const vectorstore = await MemoryVectorStore.fromTexts(
["mitochondria is the powerhouse of the cell", "buildings are made of brick"],
[{}, {}],
new OpenAIEmbeddings()
);

const retriever = vectorstore.asRetriever();

const chunks = [];

for await (const chunk of await retriever.stream(
"What is the powerhouse of the cell?"
)) {
chunks.push(chunk);
}

console.log(chunks);
[
[
Document {
pageContent: 'mitochondria is the powerhouse of the cell',
metadata: {},
id: undefined
},
Document {
pageContent: 'buildings are made of brick',
metadata: {},
id: undefined
}
]
]

Stream 只是生成了该组件的最终结果。

这没问题!并非所有组件都必须实现流式处理——在某些情况下,流式处理要么是不必要的、困难的,要么只是没有意义。

提示

使用某些非流式处理组件构建的 LCEL 链在许多情况下仍然能够进行流式处理,部分输出的流式处理在链中最后一个非流式处理步骤之后开始。

这是一个示例

import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import type { Document } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";

const formatDocs = (docs: Document[]) => {
return docs.map((doc) => doc.pageContent).join("\n-----\n");
};

const retrievalChain = RunnableSequence.from([
{
context: retriever.pipe(formatDocs),
question: new RunnablePassthrough(),
},
prompt,
model,
new StringOutputParser(),
]);

const stream = await retrievalChain.stream(
"What is the powerhouse of the cell?"
);

for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
M|
ito|
ch|
ond|
ria|
is|
the|
powerhouse|
of|
the|
cell|
.|
|
|

现在我们已经了解了 stream 方法的工作原理,让我们冒险进入流式处理事件的世界!

使用 Stream Events

事件流式处理是 beta API。此 API 可能会根据反馈进行一些更改。

注意

在 @langchain/core 0.1.27 中引入。

为了使 streamEvents 方法正常工作

  • 任何自定义函数/runnable 都必须传播回调
  • 在模型上设置正确的参数以强制 LLM 流式传输令牌。
  • 如果任何事情没有按预期工作,请告诉我们!

事件参考

下面是一个参考表,显示了各种 Runnable 对象可能发出的一些事件。

注意

当正确实现流式处理时,runnable 的输入将在输入流完全消耗后才知道。这意味着 inputs 通常只会包含在 end 事件中,而不是 start 事件中。

事件名称输入输出
on_llm_start[模型名称]{‘input’: ‘hello’}
on_llm_stream[模型名称]‘Hello’ AIMessageChunk(content=“hello”)
on_llm_end[模型名称]‘Hello human!’{“generations”[], “llmOutput”: None, …}
on_chain_startformat_docs
on_chain_streamformat_docs“hello world!, goodbye world!”
on_chain_endformat_docs[Document(…)]“hello world!, goodbye world!”
on_tool_startsome_tool{“x”: 1, “y”: “2”}
on_tool_streamsome_tool{“x”: 1, “y”: “2”}
on_tool_endsome_tool{“x”: 1, “y”: “2”}
on_retriever_start[检索器名称]{“query”: “hello”}
on_retriever_chunk[检索器名称]{documents[]}
on_retriever_end[检索器名称]{“query”: “hello”}{documents[]}
on_prompt_start[模板名称]{“question”: “hello”}
on_prompt_end[模板名称]{“question”: “hello”}ChatPromptValue(messages[SystemMessage, …])

streamEvents 还将在 v2 中发出分派的自定义事件。请参阅本指南了解更多信息。

聊天模型

让我们首先看看聊天模型产生的事件。

const events = [];

const eventStream = await model.streamEvents("hello", { version: "v2" });

for await (const event of eventStream) {
events.push(event);
}

console.log(events.length);
25
注意

嘿,API 中那个有趣的 version=“v2” 参数是什么?! 😾

这是一个 beta API,我们几乎肯定会对它进行一些更改。

此版本参数将使我们能够最大限度地减少对此代码的重大更改。

简而言之,我们现在惹您烦恼,这样我们以后就不必惹您烦恼了。

让我们看看一些 start 事件和一些 end 事件。

events.slice(0, 3);
[
{
event: 'on_chat_model_start',
data: { input: 'hello' },
name: 'ChatOpenAI',
tags: [],
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
data: { chunk: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
}
]
events.slice(-2);
[
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
},
{
event: 'on_chat_model_end',
data: { output: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
]

让我们重新审视解析流式 JSON 的示例链,以探索流式事件 API。

const chain = model.pipe(new JsonOutputParser());
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" }
);

const events = [];
for await (const event of eventStream) {
events.push(event);
}

console.log(events.length);
83

如果您查看前几个事件,您会注意到有 3 个不同的 start 事件,而不是 2 个 start 事件。

这三个 start 事件对应于

  1. 链(模型 + 解析器)
  2. 模型
  3. 解析器
events.slice(0, 3);
[
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [],
run_id: '5dd960b8-4341-4401-8993-7d04d49fcc08',
metadata: {}
},
{
event: 'on_chat_model_start',
data: { input: [Object] },
name: 'ChatOpenAI',
tags: [ 'seq:step:1' ],
run_id: '5d2917b1-886a-47a1-807d-8a0ba4cb4f65',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_parser_start',
data: {},
name: 'JsonOutputParser',
tags: [ 'seq:step:2' ],
run_id: '756c57d6-d455-484f-a556-79a82c4e1d40',
metadata: {}
}
]

您认为如果您查看最后 3 个事件会看到什么?中间呢?

让我们使用此 API 从模型和解析器中获取流事件输出。我们忽略了 start 事件、end 事件和来自链的事件。

let eventCount = 0;

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Chat model chunk: ```
Chat model chunk: json
Chat model chunk:

Chat model chunk: {

Chat model chunk:
Chat model chunk: "
Chat model chunk: countries
Chat model chunk: ":
Chat model chunk: [

Chat model chunk:
Chat model chunk: {

Chat model chunk:
Chat model chunk: "
Chat model chunk: name
Chat model chunk: ":
Chat model chunk: "
Chat model chunk: France
Chat model chunk: ",

Chat model chunk:
Chat model chunk: "
Chat model chunk: population
Chat model chunk: ":
Chat model chunk:
Chat model chunk: 652
Chat model chunk: 735
Chat model chunk: 11
Chat model chunk:

由于模型和解析器都支持流式处理,因此我们实时看到了来自这两个组件的流式处理事件!太棒了!🦜

过滤事件

由于此 API 会生成如此多的事件,因此能够过滤事件非常有用。

您可以按组件 name、组件 tags 或组件 type 进行过滤。

按名称

const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeNames: ["my_parser"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_parser_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'my_parser',
tags: [ 'seq:step:2' ],
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
metadata: {}
}
{
event: 'on_parser_stream',
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {},
data: { chunk: { countries: [Array] } }
}
{
event: 'on_parser_end',
data: { output: { countries: [Array] } },
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {}
}

按类型

const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTypes: ["chat_model"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chat_model_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'model',
tags: [ 'seq:step:1' ],
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '```',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'json',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '{\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' ',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' "',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'countries',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '":',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' [\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}

按标签

注意

标签由给定 runnable 的子组件继承。

如果您使用标签进行过滤,请确保这是您想要的。

const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTags: ["my_chain"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [ 'my_chain' ],
run_id: '1fed60d6-e0b7-4d5e-8ec7-cd7d3ee5c69f',
metadata: {}
}
{
event: 'on_chat_model_start',
data: { input: { messages: [Array] } },
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_parser_start',
data: {},
name: 'my_parser',
tags: [ 'seq:step:2', 'my_chain' ],
run_id: 'caf24a1e-255c-4937-9f38-6e46275d854a',
metadata: {}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'Certainly',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '!',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: " Here's",
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' the',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' JSON',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' format',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' output',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}

通过 HTTP 流式传输事件

为了方便起见,streamEvents 支持将流式中间事件编码为 HTTP 服务器发送事件,编码为字节。以下是它的外观(使用 TextDecoder 将二进制数据重新转换回人类可读的字符串)

const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);

let eventCount = 0;

const textDecoder = new TextDecoder();

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 3) {
continue;
}
console.log(textDecoder.decode(event));
eventCount += 1;
}
event: data
data: {"event":"on_chain_start","data":{"input":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\""},"name":"RunnableSequence","tags":["my_chain"],"run_id":"41cd92f8-9b8c-4365-8aa0-fda3abdae03d","metadata":{}}


event: data
data: {"event":"on_chat_model_start","data":{"input":{"messages":[[{"lc":1,"type":"constructor","id":["langchain_core","messages","HumanMessage"],"kwargs":{"content":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\"","additional_kwargs":{},"response_metadata":{}}}]]}},"name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}


event: data
data: {"event":"on_parser_start","data":{},"name":"my_parser","tags":["seq:step:2","my_chain"],"run_id":"402533c5-0e4e-425d-a556-c30a350972d0","metadata":{}}


event: data
data: {"event":"on_chat_model_stream","data":{"chunk":{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","tool_call_chunks":[],"additional_kwargs":{},"id":"chatcmpl-9lO9BAQwbKDy2Ou2RNFUVi0VunAsL","tool_calls":[],"invalid_tool_calls":[],"response_metadata":{"prompt":0,"completion":0,"finish_reason":null}}}},"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}

此格式的一个优点是,您可以将生成的流直接传递到具有正确标头的本机 HTTP 响应对象中(通常由 HonoNext.js 等框架使用),然后在前端解析该流。您的服务器端处理程序将如下所示

const handler = async () => {
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);
return new Response(eventStream, {
headers: {
"content-type": "text/event-stream",
},
});
};

您的前端可能如下所示(使用 @microsoft/fetch-event-source 包来获取和解析事件源)

import { fetchEventSource } from "@microsoft/fetch-event-source";

const makeChainRequest = async () => {
await fetchEventSource("https://your_url_here", {
method: "POST",
body: JSON.stringify({
foo: "bar",
}),
onmessage: (message) => {
if (message.event === "data") {
console.log(message.data);
}
},
onerror: (err) => {
console.log(err);
},
});
};

非流式处理组件

还记得某些组件由于不对输入流进行操作而无法很好地进行流式处理吗?

虽然当使用 stream 时,此类组件可能会破坏最终输出的流式处理,但 streamEvents 仍将从支持流式处理的中间步骤生成流式处理事件!

// A function that operates on finalized inputs
// rather than on an input_stream
import { JsonOutputParser } from "@langchain/core/output_parsers";
import { RunnablePassthrough } from "@langchain/core/runnables";

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]

正如预期的那样,stream API 无法正常工作,因为 extractCountryNames 不对流进行操作。

现在,让我们确认使用 streamEvents 我们仍然可以看到来自模型和解析器的流式处理输出。

const eventStream = await chain.streamEvents(
`output a list of the countries france, spain and japan and their populations in JSON format.
Use a dict with an outer key of "countries" which contains a list of countries.
Each country should have the key "name" and "population"
Your output should ONLY contain valid JSON data. Do not include any other text or content in your output.`,
{ version: "v2" }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_chat_model_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
} else {
console.log(eventType);
}
eventCount += 1;
}

聊天模型块:聊天模型块:这是聊天模型块:您聊天模型块:可以聊天模型块:表示聊天模型块:国家/地区聊天模型块:法国聊天模型块:,聊天模型块:西班牙聊天模型块:,聊天模型块:和聊天模型块:日本聊天模型块:,聊天模型块:以及聊天模型块:他们的聊天模型块:人口聊天模型块:,聊天模型块:在聊天模型块:JSON 聊天模型块:格式聊天模型块:

聊天模型块```聊天模型块:json 聊天模型块

聊天模型块:{


此页内容是否对您有帮助?


您也可以留下详细的反馈 在 GitHub 上.