如何分发自定义回调事件
在某些情况下,您可能希望从 Runnable 内部分发自定义回调事件,以便它可以在自定义回调处理程序或通过 Stream 事件 API 中显示。
例如,如果您有一个包含多个步骤的长时间运行工具,您可以在步骤之间分发自定义事件,并使用这些自定义事件来监视进度。您还可以将这些自定义事件显示给应用程序的最终用户,以向他们显示当前任务的进度。
要分发自定义事件,您需要为事件确定两个属性:name
和 data
。
属性 | 类型 | 描述 |
---|---|---|
name | string | 事件的用户定义名称。 |
data | any | 与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
- 自定义回调事件只能从现有的
Runnable
内部分发。 - 如果使用
streamEvents
,则必须使用version: "v2"
来使用自定义事件。 - LangSmith 中尚不支持发送或渲染自定义回调事件。
流事件 API
使用自定义事件的最有用方法是通过 .streamEvents()
方法。
我们可以使用 dispatchCustomEvent
API 从此方法发出自定义事件。
兼容性
分发自定义回调事件需要 @langchain/core>=0.2.16
。有关升级 @langchain/core
时需要考虑的事项,请参阅 本指南。
以下默认入口点会触发 async_hooks
的导入和初始化,以启用自动 RunnableConfig
传递,但这在所有环境中都不受支持。如果您遇到导入问题,则必须从 @langchain/core/callbacks/dispatch/web
中导入,并手动传播 RunnableConfig
对象(参见下面的示例)。
import { RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";
const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});
const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});
for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: '9eac217d-3a2d-4563-a91f-3bd49bee4b3d',
name: 'event2',
tags: [],
metadata: {},
data: 5
}
如果您处于不支持 async_hooks
的 Web 环境中,则必须从 Web 入口点导入,并手动传播配置
import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent as dispatchCustomEventWeb } from "@langchain/core/callbacks/dispatch/web";
const reflect = RunnableLambda.from(
async (value: string, config?: RunnableConfig) => {
await dispatchCustomEventWeb(
"event1",
{ reversed: value.split("").reverse().join("") },
config
);
await dispatchCustomEventWeb("event2", 5, config);
return value;
}
);
const eventStream = await reflect.streamEvents("hello world", {
version: "v2",
});
for await (const event of eventStream) {
if (event.event === "on_custom_event") {
console.log(event);
}
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event1',
tags: [],
metadata: {},
data: { reversed: 'dlrow olleh' }
}
{
event: 'on_custom_event',
run_id: 'dee1e4f0-c5ff-4118-9391-461a0dcc4cb2',
name: 'event2',
tags: [],
metadata: {},
data: 5
}
回调处理程序
让我们看看如何使用 dispatchCustomEvent
发出自定义事件。
请记住,您**必须**从现有的 Runnable
内部调用 dispatchCustomEvent
。
import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";
const reflect = RunnableLambda.from(async (value: string) => {
await dispatchCustomEvent("event1", {
reversed: value.split("").reverse().join(""),
});
await dispatchCustomEvent("event2", 5);
return value;
});
await reflect.invoke("hello world", {
callbacks: [
{
handleCustomEvent(eventName, data, runId) {
console.log(eventName, data, runId);
},
},
],
});
event1 { reversed: 'dlrow olleh' } 9c3770ac-c83d-4626-9643-b5fd80eb5431
event2 5 9c3770ac-c83d-4626-9643-b5fd80eb5431
hello world
相关
您现在已经了解了如何在链内发出自定义事件。
您可以查看 流事件 的更深入指南,了解有关解析和接收链的中间步骤的更多方法。