如何分发自定义回调事件
先决条件
本指南假定您熟悉以下概念
在某些情况下,您可能希望从Runnable 中分发自定义回调事件,以便它可以显示在自定义回调处理程序中或通过Stream Events API 显示。
例如,如果您有一个具有多个步骤的长时间运行的工具,则可以在这些步骤之间分发自定义事件,并使用这些自定义事件来监控进度。您还可以将这些自定义事件显示给应用程序的最终用户,以向他们展示当前任务的进展情况。
要分发自定义事件,您需要为该事件决定两个属性:name
和 data
。
属性 | 类型 | 描述 |
---|---|---|
name | string | 事件的用户定义名称。 |
data | any | 与事件关联的数据。这可以是任何东西,但我们建议将其设为 JSON 可序列化。 |
- 自定义回调事件只能从现有的
Runnable
中分发。 - 如果使用
streamEvents
,您必须使用version: "v2"
来使用自定义事件。 - LangSmith 中尚不支持发送或呈现自定义回调事件。
Stream Events API
使用自定义事件最有效的方法是通过.streamEvents()
方法。
我们可以使用 dispatchCustomEvent
API 从此方法中发出自定义事件。
兼容性
分发自定义回调事件需要 @langchain/core>=0.2.16
。有关升级 @langchain/core
时需要考虑的事项,请参阅本指南。
下面的默认入口点会触发导入和初始化async_hooks
以启用自动 RunnableConfig
传递,这在并非所有环境中都受支持。如果遇到导入问题,则必须从 @langchain/core/callbacks/dispatch/web
中导入,并手动传播 RunnableConfig
对象(参见下面的示例)。
import { RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";
const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});
const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});
for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event2',
tags: [],
metadata: {},
data: 5
}
如果您所处的 Web 环境不支持 async_hooks
,则必须从 Web 入口点导入,并手动传播配置。
import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent as dispatchCustomEventWeb } from "@langchain/core/callbacks/dispatch/web";
const reflect = RunnableLambda.from(
async (value: string, config?: RunnableConfig) => {
await dispatchCustomEventWeb(
"event1",
{ reversed: value.split("").reverse().join("") },
config
);
await dispatchCustomEventWeb("event2", 5, config);
return value;
}
);
const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});
for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event2',
tags: [],
metadata: {},
data: 5
}
回调处理程序
让我们看看如何使用 dispatchCustomEvent
发出自定义事件。
请记住,您**必须**从现有的 Runnable
中调用 dispatchCustomEvent
。
import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";
const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});
await reflect.invoke("hello world", {
callbacks: [
{
handleCustomEvent(eventName, data, runId) {
console.log(eventName, data, runId);
},
},
],
});
event1 { reversed: 'dlrow olleh' } 9c3770ac-c83d-4626-9643-b5fd80eb5431
event2 5 9c3770ac-c83d-4626-9643-b5fd80eb5431
hello world
相关
您现在已经了解了如何在链中发出自定义事件。
您可以查看更深入的关于 流事件 的指南,以了解更多解析和接收链中中间步骤的方法。