如何并行调用 Runnables
本指南假定您熟悉以下概念
RunnableParallel
(也称为 RunnableMap
)原语是一个对象,其值是 runnables(或可以强制转换为 runnables 的事物,如函数)。它并行运行其所有值,并且每个值都使用 RunnableParallel
的初始输入来调用。最终返回值是一个对象,其中包含每个值的结果,并以其相应的键作为索引。
使用 RunnableParallels
进行格式化
RunnableParallels
对于并行化操作很有用,但也可用于操作一个 Runnable 的输出,以匹配序列中下一个 Runnable 的输入格式。您可以使用它们来拆分或 fork 链,以便多个组件可以并行处理输入。稍后,其他组件可以连接或合并结果以合成最终响应。这种类型的链会创建一个如下所示的计算图
Input
/ \
/ \
Branch1 Branch2
\ /
\ /
Combine
下面,RunnableParallel
中每个链的输入都应是一个包含 "topic"
键的对象。我们可以通过使用匹配该结构的对象调用我们的链来满足该要求。
- npm
- Yarn
- pnpm
npm install @langchain/anthropic @langchain/cohere @langchain/core
yarn add @langchain/anthropic @langchain/cohere @langchain/core
pnpm add @langchain/anthropic @langchain/cohere @langchain/core
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableMap } from "@langchain/core/runnables";
import { ChatAnthropic } from "@langchain/anthropic";
const model = new ChatAnthropic({});
const jokeChain = PromptTemplate.fromTemplate(
"Tell me a joke about {topic}"
).pipe(model);
const poemChain = PromptTemplate.fromTemplate(
"write a 2-line poem about {topic}"
).pipe(model);
const mapChain = RunnableMap.from({
joke: jokeChain,
poem: poemChain,
});
const result = await mapChain.invoke({ topic: "bear" });
console.log(result);
/*
{
joke: AIMessage {
content: " Here's a silly joke about a bear:\n" +
'\n' +
'What do you call a bear with no teeth?\n' +
'A gummy bear!',
additional_kwargs: {}
},
poem: AIMessage {
content: ' Here is a 2-line poem about a bear:\n' +
'\n' +
'Furry and wild, the bear roams free \n' +
'Foraging the forest, strong as can be',
additional_kwargs: {}
}
}
*/
API 参考
- PromptTemplate 来自
@langchain/core/prompts
- RunnableMap 来自
@langchain/core/runnables
- ChatAnthropic 来自
@langchain/anthropic
操作输出/输入
Map 对于操作一个 Runnable 的输出以匹配序列中下一个 Runnable 的输入格式非常有用。
请注意,在 RunnableSequence.from()
调用中的对象会自动强制转换为 runnable map。对象的所有键都必须具有作为 runnable 的值,或者本身可以强制转换为 runnable(函数到 RunnableLambda
,或对象到 RunnableMap
)。当通过 .pipe()
方法组合链时,也会发生此强制转换。
import { CohereEmbeddings } from "@langchain/cohere";
import { PromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";
import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import { Document } from "@langchain/core/documents";
import { ChatAnthropic } from "@langchain/anthropic";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
const model = new ChatAnthropic();
const vectorstore = await MemoryVectorStore.fromDocuments(
[{ pageContent: "mitochondria is the powerhouse of the cell", metadata: {} }],
new CohereEmbeddings({ model: "embed-english-v3.0" })
);
const retriever = vectorstore.asRetriever();
const template = `Answer the question based only on the following context:
{context}
Question: {question}`;
const prompt = PromptTemplate.fromTemplate(template);
const formatDocs = (docs: Document[]) => docs.map((doc) => doc.pageContent);
const retrievalChain = RunnableSequence.from([
{ context: retriever.pipe(formatDocs), question: new RunnablePassthrough() },
prompt,
model,
new StringOutputParser(),
]);
const result = await retrievalChain.invoke(
"what is the powerhouse of the cell?"
);
console.log(result);
/*
Based on the given context, the powerhouse of the cell is mitochondria.
*/
API 参考
- CohereEmbeddings 来自
@langchain/cohere
- PromptTemplate 来自
@langchain/core/prompts
- StringOutputParser 来自
@langchain/core/output_parsers
- RunnablePassthrough 来自
@langchain/core/runnables
- RunnableSequence 来自
@langchain/core/runnables
- Document 来自
@langchain/core/documents
- ChatAnthropic 来自
@langchain/anthropic
- MemoryVectorStore 来自
langchain/vectorstores/memory
这里,prompt 的输入应为一个包含键 "context" 和 "question" 的 map。用户输入只是问题。因此,我们需要使用检索器获取上下文,并将用户输入以 "question" 键传递。
下一步
您现在了解了一些使用 RunnableParallel
格式化和并行化链步骤的方法。
接下来,您可能对在链中使用自定义逻辑感兴趣。