跳至主要内容

如何流式传输

先决条件

本指南假设您熟悉以下概念

流式传输对于使基于 LLM 的应用程序对最终用户具有响应性至关重要。

LLM、解析器、提示、检索器和代理等重要 LangChain 原语实现了 LangChain Runnable 接口。

此接口提供了两种流式传输内容的通用方法

  • .stream():流式传输的默认实现,它会流式传输来自链的最终输出。
  • streamEvents()streamLog():这些方法提供了一种方法来流式传输来自链的中间步骤和最终输出。

让我们来看看这两种方法!

有关 LangChain 中流式传输技术的更高级概述,请参阅 概念指南的这一部分

使用流式传输

所有 Runnable 对象都实现了一个名为 stream 的方法。

这些方法旨在以块的形式流式传输最终输出,并在可用时立即生成每个块。

只有在程序中的所有步骤都知道如何处理 **输入流** 时才能进行流式传输;即,一次处理一个输入块,并生成相应的输出块。

此处理的复杂性可能会有所不同,从像发射 LLM 生成的令牌这样的简单任务,到像在整个 JSON 完成之前流式传输 JSON 结果的部分这样的更具挑战性的任务。

探索流式传输的最佳起点是 LLM 应用程序中最重要组件——模型本身!

LLM 和聊天模型

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远慢于使应用程序对最终用户具有响应性的 **~200-300 毫秒** 阈值。

使应用程序感觉更具响应性的关键策略是显示中间进度;例如,以令牌为单位流式传输模型的输出。

import "dotenv/config";

选择您的聊天模型

安装依赖项

yarn add @langchain/openai 

添加环境变量

OPENAI_API_KEY=your-api-key

实例化模型

import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
model: "gpt-4o-mini",
temperature: 0
});
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
console.log(`${chunk.content}|`);
}
|
Hello|
!|
I'm|
a|
large|
language|
model|
developed|
by|
Open|
AI|
called|
GPT|
-|
4|
,|
based|
on|
the|
Gener|
ative|
Pre|
-trained|
Transformer|
architecture|
.|
I'm|
designed|
to|
understand|
and|
generate|
human|
-like|
text|
based|
on|
the|
input|
I|
receive|
.|
My|
primary|
function|
is|
to|
assist|
with|
answering|
questions|
,|
providing|
information|
,|
and|
engaging|
in|
various|
types|
of|
conversations|
.|
While|
I|
don't|
have|
personal|
experiences|
or|
emotions|
,|
I'm|
trained|
on|
diverse|
datasets|
that|
enable|
me|
to|
provide|
useful|
and|
relevant|
information|
across|
a|
wide|
array|
of|
topics|
.|
How|
can|
I|
assist|
you|
today|
?|
|
|

让我们看一下原始块之一

chunks[0];
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: '',
tool_call_chunks: [],
additional_kwargs: {},
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}

我们得到了一个名为 AIMessageChunk 的东西。此块表示 AIMessage 的一部分。

消息块是按设计进行累加的——可以使用 .concat() 方法将它们加起来,以获取到目前为止的响应状态!

let finalChunk = chunks[0];

for (const chunk of chunks.slice(1, 5)) {
finalChunk = finalChunk.concat(chunk);
}

finalChunk;
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Hello! I'm a",
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_call_chunks: [],
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: []
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: "Hello! I'm a",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}

实际上,所有 LLM 应用程序都涉及不止一个步骤,而不仅仅是调用语言模型。

让我们使用 LangChain 表达式语言 (LCEL) 构建一个简单的链,它结合了提示、模型和解析器,并验证流式传输是否有效。

我们将使用 StringOutputParser 来解析来自模型的输出。这是一个简单的解析器,它从 AIMessageChunk 中提取内容字段,从而提供模型返回的 令牌

提示

LCEL 是一种声明式方式,通过将不同的 LangChain 原语链式连接来指定“程序”。使用 LCEL 创建的链从流的自动实现中受益,从而允许对最终输出进行流式传输。事实上,使用 LCEL 创建的链实现了整个标准 Runnable 接口。

import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}");

const parser = new StringOutputParser();

const chain = prompt.pipe(model).pipe(parser);

const stream = await chain.stream({
topic: "parrot",
});

for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
Sure|
,|
here's|
a|
joke|
for|
you|
:

|
Why|
did|
the|
par|
rot|
sit|
on|
the|
stick|
?

|
Because|
it|
wanted|
to|
be|
a|
"|
pol|
ly|
-stick|
-al|
"|
observer|
!|
|
|
注意

您不必使用 LangChain 表达式语言 来使用 LangChain,而是可以通过在每个组件上分别调用 invokebatchstream,将结果分配给变量,然后根据需要在后续使用这些变量来依赖标准的 **命令式** 编程方法。

如果这满足您的需求,我们很高兴 👌!

使用输入流

如果您想在生成时以流式方式从输出中获取 JSON 呢?

如果您要依靠 JSON.parse 来解析部分 JSON,解析将失败,因为部分 JSON 不是有效的 JSON。

您可能会完全不知道该怎么做,并声称无法流式传输 JSON。

好吧,事实证明有一种方法可以做到——解析器需要在 **输入流** 上运行,并尝试将部分 JSON “自动补全”到有效状态。

让我们看一个这样的解析器来理解这意味着什么。

import { JsonOutputParser } from "@langchain/core/output_parsers";

const chain = model.pipe(new JsonOutputParser());
const stream = await chain.stream(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
{
countries: [
{ name: 'France', population: 67390000 },
{ name: 'Spain', population: 47350000 },
{ name: 'Japan', population: 125800000 }
]
}

现在,让我们 **中断** 流式传输。我们将使用前面的示例,并在最后追加一个提取函数,该函数从最终的 JSON 中提取国家名称。由于这个新的最后一步只是一个函数调用,没有定义流式传输行为,因此来自先前步骤的流式传输输出将被聚合,然后作为单个输入传递给该函数。

危险

链中的任何在 **最终输入** 而不是 **输入流** 上运行的步骤都可能通过 stream 打破流式传输功能。

提示

稍后,我们将讨论 streamEvents API,它会从中间步骤流式传输结果。即使链包含仅对 **最终输入** 操作的步骤,此 API 也会从中间步骤流式传输结果。

// A function that operates on finalized inputs
// rather than on an input_stream

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]

非流式传输组件

像上面的示例一样,一些内置组件(如检索器)不提供任何流式传输。如果我们尝试对它们进行 stream 会发生什么?

import { OpenAIEmbeddings } from "@langchain/openai";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const template = `Answer the question based only on the following context:
{context}

Question: {question}
`;
const prompt = ChatPromptTemplate.fromTemplate(template);

const vectorstore = await MemoryVectorStore.fromTexts(
["mitochondria is the powerhouse of the cell", "buildings are made of brick"],
[{}, {}],
new OpenAIEmbeddings()
);

const retriever = vectorstore.asRetriever();

const chunks = [];

for await (const chunk of await retriever.stream(
"What is the powerhouse of the cell?"
)) {
chunks.push(chunk);
}

console.log(chunks);
[
[
Document {
pageContent: 'mitochondria is the powerhouse of the cell',
metadata: {},
id: undefined
},
Document {
pageContent: 'buildings are made of brick',
metadata: {},
id: undefined
}
]
]

Stream 只是从该组件中生成了最终结果。

这没问题!并非所有组件都必须实现流式传输——在某些情况下,流式传输要么不必要,要么很困难,或者根本没有意义。

提示

使用一些非流式传输组件构建的 LCEL 链在很多情况下仍然能够进行流式传输,部分输出的流式传输将在链中的最后一个非流式传输步骤之后开始。

这是一个例子。

import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import type { Document } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";

const formatDocs = (docs: Document[]) => {
return docs.map((doc) => doc.pageContent).join("\n-----\n");
};

const retrievalChain = RunnableSequence.from([
{
context: retriever.pipe(formatDocs),
question: new RunnablePassthrough(),
},
prompt,
model,
new StringOutputParser(),
]);

const stream = await retrievalChain.stream(
"What is the powerhouse of the cell?"
);

for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
M|
ito|
ch|
ond|
ria|
is|
the|
powerhouse|
of|
the|
cell|
.|
|
|

既然我们已经了解了 stream 方法的工作原理,让我们进入流式传输事件的世界!

使用流式传输事件

事件流式传输是 **测试版** API。此 API 可能会根据反馈进行一些更改。

注意

在 @langchain/core **0.1.27** 中引入。

为了使 streamEvents 方法正常工作

  • 任何自定义函数/可运行对象都必须传播回调
  • 在模型上设置适当的参数,以强制 LLM 流式传输令牌。
  • 如果您发现任何问题,请告知我们!

事件参考

以下是显示各种 Runnable 对象可能发出的某些事件的参考表。

注意

当流式传输正确实现时,Runnable 的输入只有在输入流完全消耗后才能知道。这意味着 inputs 通常只包含在 end 事件中,而不是在 start 事件中。

事件名称输入输出
on_llm_start[模型名称]{‘input’: ‘hello’}
on_llm_stream[模型名称]‘Hello’ AIMessageChunk(content=“hello”)
on_llm_end[模型名称]‘Hello human!’{“generations”[], “llmOutput”: None, …}
on_chain_startformat_docs
on_chain_streamformat_docs“hello world!, goodbye world!”
on_chain_endformat_docs[Document(…)]“hello world!, goodbye world!”
on_tool_startsome_tool{“x”: 1, “y”: “2”}
on_tool_streamsome_tool{“x”: 1, “y”: “2”}
on_tool_endsome_tool{“x”: 1, “y”: “2”}
on_retriever_start[检索器名称]{“query”: “hello”}
on_retriever_chunk[检索器名称]{documents[]}
on_retriever_end[检索器名称]{“query”: “hello”}{documents[]}
on_prompt_start[模板名称]{“question”: “hello”}
on_prompt_end[模板名称]{“question”: “hello”}ChatPromptValue(messages[SystemMessage, …])

streamEvents 还会在 v2 中发出分派的自定义事件。请参阅 本指南 以了解更多信息。

聊天模型

让我们从查看聊天模型产生的事件开始。

const events = [];

const eventStream = await model.streamEvents("hello", { version: "v2" });

for await (const event of eventStream) {
events.push(event);
}

console.log(events.length);
25
注意

嘿,API 中那个有趣的 version=“v2” 参数是什么?! 😾

这是一个 **测试版 API**,我们几乎肯定会对其进行一些更改。

此版本参数将允许我们最大程度地减少对您的代码的此类重大更改。

简而言之,我们现在会惹恼您,这样以后就不用惹恼您了。

让我们看一下几个开始事件和几个结束事件。

events.slice(0, 3);
[
{
event: 'on_chat_model_start',
data: { input: 'hello' },
name: 'ChatOpenAI',
tags: [],
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
data: { chunk: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
}
]
events.slice(-2);
[
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
},
{
event: 'on_chat_model_end',
data: { output: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
]

让我们重新审视解析流式传输 JSON 的示例链,以探索流式传输事件 API。

const chain = model.pipe(new JsonOutputParser());
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" }
);

const events = [];
for await (const event of eventStream) {
events.push(event);
}

console.log(events.length);
83

如果您检查前几个事件,您会注意到有 **3** 个不同的开始事件,而不是 **2** 个开始事件。

三个开始事件对应于

  1. 链(模型 + 解析器)
  2. 模型
  3. 解析器
events.slice(0, 3);
[
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [],
run_id: '5dd960b8-4341-4401-8993-7d04d49fcc08',
metadata: {}
},
{
event: 'on_chat_model_start',
data: { input: [Object] },
name: 'ChatOpenAI',
tags: [ 'seq:step:1' ],
run_id: '5d2917b1-886a-47a1-807d-8a0ba4cb4f65',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_parser_start',
data: {},
name: 'JsonOutputParser',
tags: [ 'seq:step:2' ],
run_id: '756c57d6-d455-484f-a556-79a82c4e1d40',
metadata: {}
}
]

您认为如果您查看最后 3 个事件会看到什么?中间呢?

让我们使用此 API 从模型和解析器中获取流式传输事件。我们忽略了开始事件、结束事件以及来自链的事件。

let eventCount = 0;

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Chat model chunk: ```
Chat model chunk: json
Chat model chunk:

Chat model chunk: {

Chat model chunk:
Chat model chunk: "
Chat model chunk: countries
Chat model chunk: ":
Chat model chunk: [

Chat model chunk:
Chat model chunk: {

Chat model chunk:
Chat model chunk: "
Chat model chunk: name
Chat model chunk: ":
Chat model chunk: "
Chat model chunk: France
Chat model chunk: ",

Chat model chunk:
Chat model chunk: "
Chat model chunk: population
Chat model chunk: ":
Chat model chunk:
Chat model chunk: 652
Chat model chunk: 735
Chat model chunk: 11
Chat model chunk:

由于模型和解析器都支持流式传输,因此我们实时看到了来自这两个组件的流式传输事件!很不错! 🦜

过滤事件

由于此 API 会生成如此多的事件,因此能够过滤事件非常有用。

您可以按组件 name、组件 tags 或组件 type 进行过滤。

按名称

const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeNames: ["my_parser"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_parser_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'my_parser',
tags: [ 'seq:step:2' ],
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
metadata: {}
}
{
event: 'on_parser_stream',
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {},
data: { chunk: { countries: [Array] } }
}
{
event: 'on_parser_end',
data: { output: { countries: [Array] } },
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {}
}

按类型

const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTypes: ["chat_model"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chat_model_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'model',
tags: [ 'seq:step:1' ],
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '```',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'json',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '{\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' ',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' "',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'countries',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '":',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' [\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}

按标签

注意

标签由给定可运行对象的子组件继承。

如果您使用标签进行过滤,请确保这就是您想要的。

const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTags: ["my_chain"] }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [ 'my_chain' ],
run_id: '1fed60d6-e0b7-4d5e-8ec7-cd7d3ee5c69f',
metadata: {}
}
{
event: 'on_chat_model_start',
data: { input: { messages: [Array] } },
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_parser_start',
data: {},
name: 'my_parser',
tags: [ 'seq:step:2', 'my_chain' ],
run_id: 'caf24a1e-255c-4937-9f38-6e46275d854a',
metadata: {}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'Certainly',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '!',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: " Here's",
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' the',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' JSON',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' format',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' output',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}

通过 HTTP 流式传输事件

为了方便起见,streamEvents 支持将流式传输的中间事件编码为 HTTP 服务器发送事件,并以字节形式编码。以下是它的样子(使用 TextDecoder 将二进制数据重新转换为人类可读的字符串)

const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });

const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);

let eventCount = 0;

const textDecoder = new TextDecoder();

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 3) {
continue;
}
console.log(textDecoder.decode(event));
eventCount += 1;
}
event: data
data: {"event":"on_chain_start","data":{"input":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\""},"name":"RunnableSequence","tags":["my_chain"],"run_id":"41cd92f8-9b8c-4365-8aa0-fda3abdae03d","metadata":{}}


event: data
data: {"event":"on_chat_model_start","data":{"input":{"messages":[[{"lc":1,"type":"constructor","id":["langchain_core","messages","HumanMessage"],"kwargs":{"content":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\"","additional_kwargs":{},"response_metadata":{}}}]]}},"name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}


event: data
data: {"event":"on_parser_start","data":{},"name":"my_parser","tags":["seq:step:2","my_chain"],"run_id":"402533c5-0e4e-425d-a556-c30a350972d0","metadata":{}}


event: data
data: {"event":"on_chat_model_stream","data":{"chunk":{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","tool_call_chunks":[],"additional_kwargs":{},"id":"chatcmpl-9lO9BAQwbKDy2Ou2RNFUVi0VunAsL","tool_calls":[],"invalid_tool_calls":[],"response_metadata":{"prompt":0,"completion":0,"finish_reason":null}}}},"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}

这种格式的一个不错的功能是,您可以将生成的流直接传递到具有正确标头的本机 HTTP 响应对象(通常由 HonoNext.js 等框架使用),然后在前端解析该流。您的服务器端处理程序看起来像这样

const handler = async () => {
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);
return new Response(eventStream, {
headers: {
"content-type": "text/event-stream",
},
});
};

您的前端可能看起来像这样(使用 @microsoft/fetch-event-source 包来获取和解析事件源)

import { fetchEventSource } from "@microsoft/fetch-event-source";

const makeChainRequest = async () => {
await fetchEventSource("https://your_url_here", {
method: "POST",
body: JSON.stringify({
foo: "bar",
}),
onmessage: (message) => {
if (message.event === "data") {
console.log(message.data);
}
},
onerror: (err) => {
console.log(err);
},
});
};

非流式传输组件

还记得一些组件无法很好地流式传输,因为它们不在 **输入流** 上运行吗?

虽然此类组件在使用 stream 时会中断最终输出的流式传输,但 streamEvents 仍会从支持流式传输的中间步骤生成流式传输事件!

// A function that operates on finalized inputs
// rather than on an input_stream
import { JsonOutputParser } from "@langchain/core/output_parsers";
import { RunnablePassthrough } from "@langchain/core/runnables";

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]

正如预期的那样,stream API 工作不正常,因为 extractCountryNames 不在流上运行。

现在,让我们确认一下,使用 streamEvents 我们仍然看到来自模型和解析器的流式传输输出。

const eventStream = await chain.streamEvents(
`output a list of the countries france, spain and japan and their populations in JSON format.
Use a dict with an outer key of "countries" which contains a list of countries.
Each country should have the key "name" and "population"
Your output should ONLY contain valid JSON data. Do not include any other text or content in your output.`,
{ version: "v2" }
);

let eventCount = 0;

for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_chat_model_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
} else {
console.log(eventType);
}
eventCount += 1;
}

聊天模型块:聊天模型块:这里聊天模型块:如何聊天模型块:您聊天模型块:可以聊天模型块:表示聊天模型块:国家聊天模型块:法国聊天模型块:,聊天模型块:西班牙聊天模型块:,聊天模型块:和聊天模型块:日本聊天模型块:,聊天模型块:以及聊天模型块:它们聊天模型块:的人口聊天模型块:,聊天模型块:在聊天模型块:JSON 聊天模型块:格式聊天模型块:

聊天模型块```聊天模型块:json 聊天模型块

聊天模型块:{


此页面是否有帮助?


您也可以在 GitHub 上留下详细的反馈 GitHub.