跳至主要内容

如何分发自定义回调事件

先决条件

本指南假设您熟悉以下概念

在某些情况下,您可能希望从 Runnable 内部分发自定义回调事件,以便它可以在自定义回调处理程序或通过 Stream 事件 API 中显示。

例如,如果您有一个包含多个步骤的长时间运行工具,您可以在步骤之间分发自定义事件,并使用这些自定义事件来监视进度。您还可以将这些自定义事件显示给应用程序的最终用户,以向他们显示当前任务的进度。

要分发自定义事件,您需要为事件确定两个属性:namedata

属性类型描述
namestring事件的用户定义名称。
dataany与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。
  • 自定义回调事件只能从现有的 Runnable 内部分发。
  • 如果使用 streamEvents,则必须使用 version: "v2" 来使用自定义事件。
  • LangSmith 中尚不支持发送或渲染自定义回调事件。

流事件 API

使用自定义事件的最有用方法是通过 .streamEvents() 方法。

我们可以使用 dispatchCustomEvent API 从此方法发出自定义事件。

兼容性

分发自定义回调事件需要 @langchain/core>=0.2.16。有关升级 @langchain/core 时需要考虑的事项,请参阅 本指南

以下默认入口点会触发 async_hooks 的导入和初始化,以启用自动 RunnableConfig 传递,但这在所有环境中都不受支持。如果您遇到导入问题,则必须从 @langchain/core/callbacks/dispatch/web 中导入,并手动传播 RunnableConfig 对象(参见下面的示例)。

import { RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";

const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});

const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});

for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event2',
tags: [],
metadata: {},
data: 5
}

如果您处于不支持 async_hooks 的 Web 环境中,则必须从 Web 入口点导入,并手动传播配置

import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent as dispatchCustomEventWeb } from "@langchain/core/callbacks/dispatch/web";

const reflect = RunnableLambda.from(
async (value: string, config?: RunnableConfig) => {
await dispatchCustomEventWeb(
"event1",
{ reversed: value.split("").reverse().join("") },
config
);
await dispatchCustomEventWeb("event2", 5, config);
return value;
}
);

const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});

for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event2',
tags: [],
metadata: {},
data: 5
}

回调处理程序

让我们看看如何使用 dispatchCustomEvent 发出自定义事件。

请记住,您**必须**从现有的 Runnable 内部调用 dispatchCustomEvent

import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";

const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});

await reflect.invoke("hello world", {
callbacks: [
{
handleCustomEvent(eventName, data, runId) {
console.log(eventName, data, runId);
},
},
],
});
event1 { reversed: 'dlrow olleh' } 9c3770ac-c83d-4626-9643-b5fd80eb5431
event2 5 9c3770ac-c83d-4626-9643-b5fd80eb5431
hello world

您现在已经了解了如何在链内发出自定义事件。

您可以查看 流事件 的更深入指南,了解有关解析和接收链的中间步骤的更多方法。


此页面是否有帮助?


您也可以在 GitHub 上留下详细的反馈 GitHub.