如何进行流式输出
本指南假设您熟悉以下概念
流式输出对于使基于 LLM 的应用程序对最终用户具有响应性至关重要。
LLM、解析器、提示、检索器和代理等重要的 LangChain 原语实现了 LangChain 可运行接口。
此接口提供了两种流式输出内容的通用方法
.stream()
:流式输出的默认实现,用于流式输出链的最终输出。streamEvents()
和streamLog()
:这些提供了一种方法来流式输出链的中间步骤和最终输出。
让我们看看这两种方法!
有关 LangChain 中流式输出技术的更高级概述,请参阅概念指南的这一部分。
使用 Stream
所有 Runnable
对象都实现了一个名为 stream 的方法。
这些方法旨在分块流式输出最终输出,并在输出可用时立即生成每个块。
只有当程序中的所有步骤都知道如何处理**输入流**时,才能进行流式输出;即,一次处理一个输入块,并生成相应的输出块。
此处理的复杂性可能会有所不同,从简单的任务(如发出 LLM 生成的令牌)到更具挑战性的任务(如在整个 JSON 完成之前流式输出 JSON 结果的部分)。
探索流式输出的最佳起点是 LLM 应用程序中最重要的一部分——模型本身!
LLM 和聊天模型
大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远慢于应用程序对最终用户具有响应性的阈值(**~200-300 毫秒**)。
使应用程序感觉更具响应性的关键策略是显示中间进度;例如,逐个令牌流式输出模型的输出。
import "dotenv/config";
选择你的聊天模型
- OpenAI
- Anthropic
- FireworksAI
- MistralAI
- Groq
- VertexAI
安装依赖项
参见 此部分了解安装集成包的一般说明.
- npm
- yarn
- pnpm
npm i @langchain/openai
yarn add @langchain/openai
pnpm add @langchain/openai
添加环境变量
OPENAI_API_KEY=your-api-key
实例化模型
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "gpt-4o-mini",
temperature: 0
});
安装依赖项
参见 此部分了解安装集成包的一般说明.
- npm
- yarn
- pnpm
npm i @langchain/anthropic
yarn add @langchain/anthropic
pnpm add @langchain/anthropic
添加环境变量
ANTHROPIC_API_KEY=your-api-key
实例化模型
import { ChatAnthropic } from "@langchain/anthropic";
const model = new ChatAnthropic({
model: "claude-3-5-sonnet-20240620",
temperature: 0
});
安装依赖项
参见 此部分了解安装集成包的一般说明.
- npm
- yarn
- pnpm
npm i @langchain/community
yarn add @langchain/community
pnpm add @langchain/community
添加环境变量
FIREWORKS_API_KEY=your-api-key
实例化模型
import { ChatFireworks } from "@langchain/community/chat_models/fireworks";
const model = new ChatFireworks({
model: "accounts/fireworks/models/llama-v3p1-70b-instruct",
temperature: 0
});
安装依赖项
参见 此部分了解安装集成包的一般说明.
- npm
- yarn
- pnpm
npm i @langchain/mistralai
yarn add @langchain/mistralai
pnpm add @langchain/mistralai
添加环境变量
MISTRAL_API_KEY=your-api-key
实例化模型
import { ChatMistralAI } from "@langchain/mistralai";
const model = new ChatMistralAI({
model: "mistral-large-latest",
temperature: 0
});
安装依赖项
参见 此部分了解安装集成包的一般说明.
- npm
- yarn
- pnpm
npm i @langchain/groq
yarn add @langchain/groq
pnpm add @langchain/groq
添加环境变量
GROQ_API_KEY=your-api-key
实例化模型
import { ChatGroq } from "@langchain/groq";
const model = new ChatGroq({
model: "mixtral-8x7b-32768",
temperature: 0
});
安装依赖项
参见 此部分了解安装集成包的一般说明.
- npm
- yarn
- pnpm
npm i @langchain/google-vertexai
yarn add @langchain/google-vertexai
pnpm add @langchain/google-vertexai
添加环境变量
GOOGLE_APPLICATION_CREDENTIALS=credentials.json
实例化模型
import { ChatVertexAI } from "@langchain/google-vertexai";
const model = new ChatVertexAI({
model: "gemini-1.5-flash",
temperature: 0
});
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
console.log(`${chunk.content}|`);
}
|
Hello|
!|
I'm|
a|
large|
language|
model|
developed|
by|
Open|
AI|
called|
GPT|
-|
4|
,|
based|
on|
the|
Gener|
ative|
Pre|
-trained|
Transformer|
architecture|
.|
I'm|
designed|
to|
understand|
and|
generate|
human|
-like|
text|
based|
on|
the|
input|
I|
receive|
.|
My|
primary|
function|
is|
to|
assist|
with|
answering|
questions|
,|
providing|
information|
,|
and|
engaging|
in|
various|
types|
of|
conversations|
.|
While|
I|
don't|
have|
personal|
experiences|
or|
emotions|
,|
I'm|
trained|
on|
diverse|
datasets|
that|
enable|
me|
to|
provide|
useful|
and|
relevant|
information|
across|
a|
wide|
array|
of|
topics|
.|
How|
can|
I|
assist|
you|
today|
?|
|
|
让我们看看其中一个原始块
chunks[0];
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: '',
tool_call_chunks: [],
additional_kwargs: {},
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
我们得到了一个名为 AIMessageChunk
的对象。此块表示 AIMessage
的一部分。
消息块在设计上是累加的——可以使用 .concat()
方法将它们加起来以获取迄今为止响应的状态!
let finalChunk = chunks[0];
for (const chunk of chunks.slice(1, 5)) {
finalChunk = finalChunk.concat(chunk);
}
finalChunk;
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Hello! I'm a",
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_call_chunks: [],
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: []
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: "Hello! I'm a",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
链
几乎所有 LLM 应用程序都涉及不止一个对语言模型的调用。
让我们使用 LangChain 表达式语言
(LCEL
) 构建一个简单的链,该链组合了提示、模型和解析器,并验证流式输出是否有效。
我们将使用StringOutputParser
来解析模型的输出。这是一个简单的解析器,它从AIMessageChunk
中提取内容字段,从而获取模型返回的token
。
LCEL 是一种声明式的方式来指定“程序”,方法是将不同的 LangChain 原语链接在一起。使用 LCEL 创建的链受益于流的自动实现,允许最终输出的流式传输。事实上,使用 LCEL 创建的链实现了整个标准的 Runnable 接口。
import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";
const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}");
const parser = new StringOutputParser();
const chain = prompt.pipe(model).pipe(parser);
const stream = await chain.stream({
topic: "parrot",
});
for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
Sure|
,|
here's|
a|
joke|
for|
you|
:
|
Why|
did|
the|
par|
rot|
sit|
on|
the|
stick|
?
|
Because|
it|
wanted|
to|
be|
a|
"|
pol|
ly|
-stick|
-al|
"|
observer|
!|
|
|
您不必使用LangChain 表达式语言
来使用 LangChain,而是可以通过分别调用每个组件上的invoke
、batch
或stream
,将结果分配给变量,然后根据需要在下游使用它们,来依靠标准的**命令式**编程方法。
如果这对您的需求有效,那么我们很高兴 👌!
使用输入流
如果您想在生成 JSON 输出时将其流式传输怎么办?
如果您依赖JSON.parse
来解析部分 JSON,则解析会失败,因为部分 JSON 不是有效的 JSON。
您可能会完全不知道该怎么办,并声称无法流式传输 JSON。
好吧,事实证明有一种方法可以做到这一点 - 解析器需要对**输入流**进行操作,并尝试将部分 JSON“自动完成”到有效状态。
让我们看看这样的解析器在实际操作中的情况,以了解这意味着什么。
import { JsonOutputParser } from "@langchain/core/output_parsers";
const chain = model.pipe(new JsonOutputParser());
const stream = await chain.stream(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
{
countries: [
{ name: 'France', population: 67390000 },
{ name: 'Spain', population: 47350000 },
{ name: 'Japan', population: 125800000 }
]
}
现在,让我们**中断**流式传输。我们将使用前面的示例,并在最后附加一个提取函数,该函数从最终的 JSON 中提取国家名称。由于这个新的最后一步只是一个函数调用,没有定义的流式传输行为,因此先前步骤的流式传输输出会被聚合,然后作为单个输入传递给该函数。
链中任何在**最终输入**而不是**输入流**上操作的步骤都可能通过stream
破坏流式传输功能。
稍后,我们将讨论streamEvents
API,它会流式传输来自中间步骤的结果。即使链包含仅对**最终输入**进行操作的步骤,此 API 也会流式传输来自中间步骤的结果。
// A function that operates on finalized inputs
// rather than on an input_stream
// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};
const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);
const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]
非流式组件
与上面的示例一样,某些内置组件(如检索器)不提供任何流式传输。如果我们尝试stream
它们会发生什么?
import { OpenAIEmbeddings } from "@langchain/openai";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
import { ChatPromptTemplate } from "@langchain/core/prompts";
const template = `Answer the question based only on the following context:
{context}
Question: {question}
`;
const prompt = ChatPromptTemplate.fromTemplate(template);
const vectorstore = await MemoryVectorStore.fromTexts(
["mitochondria is the powerhouse of the cell", "buildings are made of brick"],
[{}, {}],
new OpenAIEmbeddings()
);
const retriever = vectorstore.asRetriever();
const chunks = [];
for await (const chunk of await retriever.stream(
"What is the powerhouse of the cell?"
)) {
chunks.push(chunk);
}
console.log(chunks);
[
[
Document {
pageContent: 'mitochondria is the powerhouse of the cell',
metadata: {},
id: undefined
},
Document {
pageContent: 'buildings are made of brick',
metadata: {},
id: undefined
}
]
]
Stream 仅生成了该组件的最终结果。
这没问题!并非所有组件都必须实现流式传输 - 在某些情况下,流式传输是不必要的、困难的,或者根本没有意义。
在很多情况下,使用一些非流式组件构建的 LCEL 链仍然能够流式传输,部分输出的流式传输将从链中最后一个非流式步骤之后开始。
这是一个示例
import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import type { Document } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";
const formatDocs = (docs: Document[]) => {
return docs.map((doc) => doc.pageContent).join("\n-----\n");
};
const retrievalChain = RunnableSequence.from([
{
context: retriever.pipe(formatDocs),
question: new RunnablePassthrough(),
},
prompt,
model,
new StringOutputParser(),
]);
const stream = await retrievalChain.stream(
"What is the powerhouse of the cell?"
);
for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
M|
ito|
ch|
ond|
ria|
is|
the|
powerhouse|
of|
the|
cell|
.|
|
|
现在我们已经了解了stream
方法的工作原理,让我们冒险进入流式事件的世界!
使用流式事件
事件流式传输是**测试版** API。此 API 可能会根据反馈稍作更改。
在@langchain/core 0.1.27 中引入。
为了使streamEvents
方法正常工作
- 任何自定义函数/可运行对象都必须传播回调
- 在模型上设置正确的参数以强制 LLM 流式传输标记。
- 如果任何内容没有按预期工作,请告诉我们!
事件参考
下面是一个参考表,其中显示了各种 Runnable 对象可能发出的某些事件。
当正确实现流式传输时,可运行对象的输入直到输入流完全使用后才能知道。这意味着inputs
通常仅包含在end
事件中,而不是在start
事件中。
事件 | 名称 | 块 | 输入 | 输出 |
---|---|---|---|---|
on_llm_start | [模型名称] | {‘input’: ‘hello’} | ||
on_llm_stream | [模型名称] | ‘Hello’ 或 AIMessageChunk(content=“hello”) | ||
on_llm_end | [模型名称] | ‘Hello human!’ | {“generations”[…], “llmOutput”: None, …} | |
on_chain_start | format_docs | |||
on_chain_stream | format_docs | “hello world!, goodbye world!” | ||
on_chain_end | format_docs | [Document(…)] | “hello world!, goodbye world!” | |
on_tool_start | some_tool | {“x”: 1, “y”: “2”} | ||
on_tool_stream | some_tool | {“x”: 1, “y”: “2”} | ||
on_tool_end | some_tool | {“x”: 1, “y”: “2”} | ||
on_retriever_start | [检索器名称] | {“query”: “hello”} | ||
on_retriever_chunk | [检索器名称] | {documents[…]} | ||
on_retriever_end | [检索器名称] | {“query”: “hello”} | {documents[…]} | |
on_prompt_start | [模板名称] | {“question”: “hello”} | ||
on_prompt_end | [模板名称] | {“question”: “hello”} | ChatPromptValue(messages[SystemMessage, …]) |
streamEvents
还将在v2
中发出分派的自定义事件。请参阅本指南了解更多信息。
聊天模型
让我们首先看看聊天模型产生的事件。
const events = [];
const eventStream = await model.streamEvents("hello", { version: "v2" });
for await (const event of eventStream) {
events.push(event);
}
console.log(events.length);
25
嘿,API 中那个有趣的 version=“v2” 参数是什么?!😾
这是一个**测试版 API**,我们几乎肯定会对其进行一些更改。
此版本参数将允许我们最大程度地减少此类对您代码的重大更改。
简而言之,我们现在正在惹恼您,这样以后就不用再惹恼您了。
让我们看一下几个开始事件和几个结束事件。
events.slice(0, 3);
[
{
event: 'on_chat_model_start',
data: { input: 'hello' },
name: 'ChatOpenAI',
tags: [],
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
data: { chunk: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
}
]
events.slice(-2);
[
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
},
{
event: 'on_chat_model_end',
data: { output: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
]
链
让我们重新审视解析流式 JSON 的示例链,以探索流式事件 API。
const chain = model.pipe(new JsonOutputParser());
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" }
);
const events = [];
for await (const event of eventStream) {
events.push(event);
}
console.log(events.length);
83
如果您检查前几个事件,您会注意到有3个不同的开始事件,而不是2个开始事件。
这三个开始事件对应于
- 链(模型 + 解析器)
- 模型
- 解析器
events.slice(0, 3);
[
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [],
run_id: '5dd960b8-4341-4401-8993-7d04d49fcc08',
metadata: {}
},
{
event: 'on_chat_model_start',
data: { input: [Object] },
name: 'ChatOpenAI',
tags: [ 'seq:step:1' ],
run_id: '5d2917b1-886a-47a1-807d-8a0ba4cb4f65',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_parser_start',
data: {},
name: 'JsonOutputParser',
tags: [ 'seq:step:2' ],
run_id: '756c57d6-d455-484f-a556-79a82c4e1d40',
metadata: {}
}
]
如果您查看最后 3 个事件,您认为会看到什么?中间的呢?
让我们使用此 API 从模型和解析器输出流式事件。我们忽略了开始事件、结束事件以及来自链的事件。
let eventCount = 0;
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Chat model chunk: ```
Chat model chunk: json
Chat model chunk:
Chat model chunk: {
Chat model chunk:
Chat model chunk: "
Chat model chunk: countries
Chat model chunk: ":
Chat model chunk: [
Chat model chunk:
Chat model chunk: {
Chat model chunk:
Chat model chunk: "
Chat model chunk: name
Chat model chunk: ":
Chat model chunk: "
Chat model chunk: France
Chat model chunk: ",
Chat model chunk:
Chat model chunk: "
Chat model chunk: population
Chat model chunk: ":
Chat model chunk:
Chat model chunk: 652
Chat model chunk: 735
Chat model chunk: 11
Chat model chunk:
因为模型和解析器都支持流式传输,所以我们实时看到了来自这两个组件的流式传输事件!太棒了!🦜
过滤事件
由于此 API 会产生如此多的事件,因此能够过滤事件非常有用。
您可以按组件name
、组件tags
或组件type
进行过滤。
按名称
const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeNames: ["my_parser"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_parser_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'my_parser',
tags: [ 'seq:step:2' ],
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
metadata: {}
}
{
event: 'on_parser_stream',
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {},
data: { chunk: { countries: [Array] } }
}
{
event: 'on_parser_end',
data: { output: { countries: [Array] } },
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {}
}
按类型
const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTypes: ["chat_model"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chat_model_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'model',
tags: [ 'seq:step:1' ],
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '```',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'json',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '{\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' ',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' "',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'countries',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '":',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' [\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
按标签
标签由给定可运行对象的子组件继承。
如果您使用标签进行过滤,请确保这就是您想要的。
const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTags: ["my_chain"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [ 'my_chain' ],
run_id: '1fed60d6-e0b7-4d5e-8ec7-cd7d3ee5c69f',
metadata: {}
}
{
event: 'on_chat_model_start',
data: { input: { messages: [Array] } },
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_parser_start',
data: {},
name: 'my_parser',
tags: [ 'seq:step:2', 'my_chain' ],
run_id: 'caf24a1e-255c-4937-9f38-6e46275d854a',
metadata: {}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'Certainly',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '!',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: " Here's",
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' the',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' JSON',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' format',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' output',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
通过 HTTP 流式传输事件
为方便起见,streamEvents
支持将流式传输的中间事件编码为 HTTP 服务器发送事件,以字节编码。如下所示(使用TextDecoder
将二进制数据转换回人类可读的字符串)
const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);
let eventCount = 0;
const textDecoder = new TextDecoder();
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 3) {
continue;
}
console.log(textDecoder.decode(event));
eventCount += 1;
}
event: data
data: {"event":"on_chain_start","data":{"input":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\""},"name":"RunnableSequence","tags":["my_chain"],"run_id":"41cd92f8-9b8c-4365-8aa0-fda3abdae03d","metadata":{}}
event: data
data: {"event":"on_chat_model_start","data":{"input":{"messages":[[{"lc":1,"type":"constructor","id":["langchain_core","messages","HumanMessage"],"kwargs":{"content":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\"","additional_kwargs":{},"response_metadata":{}}}]]}},"name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}
event: data
data: {"event":"on_parser_start","data":{},"name":"my_parser","tags":["seq:step:2","my_chain"],"run_id":"402533c5-0e4e-425d-a556-c30a350972d0","metadata":{}}
event: data
data: {"event":"on_chat_model_stream","data":{"chunk":{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","tool_call_chunks":[],"additional_kwargs":{},"id":"chatcmpl-9lO9BAQwbKDy2Ou2RNFUVi0VunAsL","tool_calls":[],"invalid_tool_calls":[],"response_metadata":{"prompt":0,"completion":0,"finish_reason":null}}}},"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}
此格式的一个很好的功能是,您可以将生成的流直接传递到具有正确标头的本机HTTP 响应对象(通常由像Hono和Next.js这样的框架使用),然后在前端解析该流。您的服务器端处理程序将如下所示
const handler = async () => {
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);
return new Response(eventStream, {
headers: {
"content-type": "text/event-stream",
},
});
};
您的前端可能如下所示(使用@microsoft/fetch-event-source
包来获取和解析事件源)
import { fetchEventSource } from "@microsoft/fetch-event-source";
const makeChainRequest = async () => {
await fetchEventSource("https://your_url_here", {
method: "POST",
body: JSON.stringify({
foo: "bar",
}),
onmessage: (message) => {
if (message.event === "data") {
console.log(message.data);
}
},
onerror: (err) => {
console.log(err);
},
});
};
非流式组件
还记得有些组件由于不处理**输入流**而无法很好地流式传输吗?
虽然此类组件在使用stream
时可能会破坏最终输出的流式传输,但streamEvents
仍将产生来自支持流式传输的中间步骤的流式传输事件!
// A function that operates on finalized inputs
// rather than on an input_stream
import { JsonOutputParser } from "@langchain/core/output_parsers";
import { RunnablePassthrough } from "@langchain/core/runnables";
// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};
const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);
const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]
正如预期的那样,stream
API 无法正常工作,因为extractCountryNames
不处理流。
现在,让我们使用streamEvents
确认我们仍然看到来自模型和解析器的流式输出。
const eventStream = await chain.streamEvents(
`output a list of the countries france, spain and japan and their populations in JSON format.
Use a dict with an outer key of "countries" which contains a list of countries.
Each country should have the key "name" and "population"
Your output should ONLY contain valid JSON data. Do not include any other text or content in your output.`,
{ version: "v2" }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_chat_model_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
} else {
console.log(eventType);
}
eventCount += 1;
}
聊天模型块:聊天模型块:以下是聊天模型块:如何聊天模型块:您聊天模型块:可以聊天模型块:表示聊天模型块:国家聊天模型块:法国聊天模型块:,聊天模型块:西班牙聊天模型块:,聊天模型块:和聊天模型块:日本聊天模型块:,聊天模型块:以及聊天模型块:他们聊天模型块:的人口聊天模型块:,聊天模型块:在聊天模型块:JSON聊天模型块:格式聊天模型块:
聊天模型块```聊天模型块:json聊天模型块
聊天模型块:{