跳到主要内容

如何并行调用 Runnables

先决条件

本指南假定您熟悉以下概念

RunnableParallel(也称为 RunnableMap)原语是一个对象,其值是 runnables(或可以强制转换为 runnables 的事物,如函数)。它并行运行其所有值,并且每个值都使用 RunnableParallel 的初始输入来调用。最终返回值是一个对象,其中包含每个值的结果,并以其相应的键作为索引。

使用 RunnableParallels 进行格式化

RunnableParallels 对于并行化操作很有用,但也可用于操作一个 Runnable 的输出,以匹配序列中下一个 Runnable 的输入格式。您可以使用它们来拆分或 fork 链,以便多个组件可以并行处理输入。稍后,其他组件可以连接或合并结果以合成最终响应。这种类型的链会创建一个如下所示的计算图

     Input
/ \
/ \
Branch1 Branch2
\ /
\ /
Combine

下面,RunnableParallel 中每个链的输入都应是一个包含 "topic" 键的对象。我们可以通过使用匹配该结构的对象调用我们的链来满足该要求。

npm install @langchain/anthropic @langchain/cohere @langchain/core
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableMap } from "@langchain/core/runnables";
import { ChatAnthropic } from "@langchain/anthropic";

const model = new ChatAnthropic({});
const jokeChain = PromptTemplate.fromTemplate(
"Tell me a joke about {topic}"
).pipe(model);
const poemChain = PromptTemplate.fromTemplate(
"write a 2-line poem about {topic}"
).pipe(model);

const mapChain = RunnableMap.from({
joke: jokeChain,
poem: poemChain,
});

const result = await mapChain.invoke({ topic: "bear" });
console.log(result);
/*
{
joke: AIMessage {
content: " Here's a silly joke about a bear:\n" +
'\n' +
'What do you call a bear with no teeth?\n' +
'A gummy bear!',
additional_kwargs: {}
},
poem: AIMessage {
content: ' Here is a 2-line poem about a bear:\n' +
'\n' +
'Furry and wild, the bear roams free \n' +
'Foraging the forest, strong as can be',
additional_kwargs: {}
}
}
*/

API 参考

操作输出/输入

Map 对于操作一个 Runnable 的输出以匹配序列中下一个 Runnable 的输入格式非常有用。

请注意,在 RunnableSequence.from() 调用中的对象会自动强制转换为 runnable map。对象的所有键都必须具有作为 runnable 的值,或者本身可以强制转换为 runnable(函数到 RunnableLambda,或对象到 RunnableMap)。当通过 .pipe() 方法组合链时,也会发生此强制转换。

import { CohereEmbeddings } from "@langchain/cohere";
import { PromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";
import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import { Document } from "@langchain/core/documents";
import { ChatAnthropic } from "@langchain/anthropic";
import { MemoryVectorStore } from "langchain/vectorstores/memory";

const model = new ChatAnthropic();
const vectorstore = await MemoryVectorStore.fromDocuments(
[{ pageContent: "mitochondria is the powerhouse of the cell", metadata: {} }],
new CohereEmbeddings({ model: "embed-english-v3.0" })
);
const retriever = vectorstore.asRetriever();
const template = `Answer the question based only on the following context:
{context}

Question: {question}`;

const prompt = PromptTemplate.fromTemplate(template);

const formatDocs = (docs: Document[]) => docs.map((doc) => doc.pageContent);

const retrievalChain = RunnableSequence.from([
{ context: retriever.pipe(formatDocs), question: new RunnablePassthrough() },
prompt,
model,
new StringOutputParser(),
]);

const result = await retrievalChain.invoke(
"what is the powerhouse of the cell?"
);
console.log(result);

/*
Based on the given context, the powerhouse of the cell is mitochondria.
*/

API 参考

这里,prompt 的输入应为一个包含键 "context" 和 "question" 的 map。用户输入只是问题。因此,我们需要使用检索器获取上下文,并将用户输入以 "question" 键传递。

下一步

您现在了解了一些使用 RunnableParallel 格式化和并行化链步骤的方法。

接下来,您可能对在链中使用自定义逻辑感兴趣。


此页是否对您有帮助?


您也可以留下详细的反馈 在 GitHub 上.