跳到主要内容

如何分发自定义回调事件

前提条件

本指南假定您熟悉以下概念

在某些情况下,您可能希望从 Runnable 中分发自定义回调事件,以便可以在自定义回调处理程序中或通过 Stream Events API 显示它。

例如,如果您有一个包含多个步骤的长时间运行的工具,则可以在步骤之间分发自定义事件,并使用这些自定义事件来监视进度。您还可以将这些自定义事件呈现给应用程序的最终用户,以向他们展示当前任务的进度。

要分发自定义事件,您需要为事件确定两个属性:namedata

属性类型描述
名称字符串用户为事件定义的名称。
数据任意类型与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。
  • 自定义回调事件只能从现有的 Runnable 中分发。
  • 如果使用 streamEvents,则必须使用 version: "v2" 才能使用自定义事件。
  • LangSmith 尚不支持发送或渲染自定义回调事件。

Stream Events API

使用自定义事件的最有用方法是通过 .streamEvents() 方法。

我们可以使用 dispatchCustomEvent API 从此方法发出自定义事件。

兼容性

分发自定义回调事件需要 @langchain/core>=0.2.16。有关升级 @langchain/core 时需要考虑的一些事项,请参阅本指南

下面的默认入口点触发 async_hooks 的导入和初始化,以启用自动 RunnableConfig 传递,这并非在所有环境中都受支持。如果您看到导入问题,则必须从 @langchain/core/callbacks/dispatch/web 导入并手动传播 RunnableConfig 对象(请参阅下面的示例)。

import { RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";

const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});

const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});

for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event2',
tags: [],
metadata: {},
data: 5
}

如果您在不支持 async_hooks 的 Web 环境中,则必须从 Web 入口点导入并改为手动传播配置

import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent as dispatchCustomEventWeb } from "@langchain/core/callbacks/dispatch/web";

const reflect = RunnableLambda.from(
async (value: string, config?: RunnableConfig) => {
await dispatchCustomEventWeb(
"event1",
{ reversed: value.split("").reverse().join("") },
config
);
await dispatchCustomEventWeb("event2", 5, config);
return value;
}
);

const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});

for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event2',
tags: [],
metadata: {},
data: 5
}

回调处理程序

让我们看看如何使用 dispatchCustomEvent 发出自定义事件。

请记住,您必须从现有的 Runnable 中调用 dispatchCustomEvent

import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";

const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});

await reflect.invoke("hello world", {
callbacks: [
{
handleCustomEvent(eventName, data, runId) {
console.log(eventName, data, runId);
},
},
],
});
event1 { reversed: 'dlrow olleh' } 9c3770ac-c83d-4626-9643-b5fd80eb5431
event2 5 9c3770ac-c83d-4626-9643-b5fd80eb5431
hello world

您现在已经了解了如何从链中发出自定义事件。

您可以查看有关 流事件的更深入的指南,以了解更多解析和接收链中中间步骤的方法。


此页对您有帮助吗?


您也可以留下详细的反馈 在 GitHub 上.