如何重新索引数据以使您的向量数据库与底层数据源保持同步
本指南假设您熟悉以下概念
在这里,我们将介绍使用 LangChain 索引 API 的基本索引工作流程。
索引 API 允许您从任何来源加载文档并将其与向量数据库保持同步。具体而言,它有助于
- 避免将重复的内容写入向量数据库
- 避免重新写入未更改的内容
- 避免对未更改的内容重新计算嵌入
所有这些都应该为您节省时间和金钱,并提高您的向量搜索结果。
重要的是,索引 API 即使对于经过多个转换步骤(例如,通过文本分块)处理的文档(相对于原始源文档而言)也适用。
工作原理
LangChain 索引使用记录管理器 (RecordManager
) 来跟踪写入向量数据库的文档。
在索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中
- 文档哈希值(页面内容和元数据的哈希值)
- 写入时间
- 源 ID - 每个文档的元数据中应包含信息,以便我们能够确定此文档的最终来源
删除模式
在将文档索引到向量数据库时,可能需要删除向量数据库中的一些现有文档。在某些情况下,您可能希望删除所有来自与新索引的文档相同来源的现有文档。在其他情况下,您可能希望完全删除所有现有文档。索引 API 删除模式允许您选择所需的模式
清理模式 | 去重内容 | 可并行化 | 清理已删除的源文档 | 清理源文档和/或派生文档的变动 | 清理时间 |
---|---|---|---|---|---|
无 | ✅ | ✅ | ❌ | ❌ | - |
增量 | ✅ | ✅ | ❌ | ✅ | 连续 |
完整 | ✅ | ❌ | ✅ | ✅ | 在索引结束时 |
None
不会执行任何自动清理,允许用户手动清理旧内容。
incremental
和 full
提供以下自动清理
- 如果源文档或派生文档的内容已更改,
incremental
或full
模式都将清理(删除)以前版本的该内容。 - 如果源文档已被删除(这意味着它未包含在当前正在索引的文档中),则完整清理模式将正确地将其从向量数据库中删除,但
incremental
模式不会。
当内容发生变动(例如,源 PDF 文件已修改)时,在索引过程中会有一段时间,新旧版本都可能会返回给用户。这发生在新内容写入后,但在旧版本删除之前。
incremental
索引将这段时间缩短到最少,因为它能够在写入时连续清理。full
模式在所有批次写入后执行清理。
要求
- 请勿与独立于索引 API 预先填充内容的存储一起使用,因为记录管理器将不知道以前已插入记录。
- 仅适用于支持以下功能的 LangChain
vectorstore
:a). 按 ID 添加文档(addDocuments
方法,包含 ids 参数) b). 按 ID 删除(delete 方法,包含 ids 参数)
兼容的向量存储:PGVector
,Chroma
,CloudflareVectorize
,ElasticVectorSearch
,FAISS
,MomentoVectorIndex
,Pinecone
,SupabaseVectorStore
,VercelPostgresVectorStore
,Weaviate
,Xata
注意
记录管理器依赖于基于时间机制来确定哪些内容可以被清理(在使用full
或incremental
清理模式时)。
如果两个任务背靠背运行,并且第一个任务在时钟时间更改之前完成,那么第二个任务可能无法清理内容。
由于以下原因,这在实际设置中不太可能成为问题
RecordManager
使用更高分辨率的时间戳。- 数据需要在第一个和第二个任务运行之间发生变化,如果任务之间的时间间隔很小,这不太可能发生。
- 索引任务通常需要超过几毫秒。
快速入门
import { PostgresRecordManager } from "@langchain/community/indexes/postgres";
import { index } from "langchain/indexes";
import { PGVectorStore } from "@langchain/community/vectorstores/pgvector";
import { PoolConfig } from "pg";
import { OpenAIEmbeddings } from "@langchain/openai";
import { CharacterTextSplitter } from "@langchain/textsplitters";
import { BaseDocumentLoader } from "@langchain/core/document_loaders/base";
// First, follow set-up instructions at
// https://js.langchain.ac.cn/docs/modules/indexes/vector_stores/integrations/pgvector
const config = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "testlangchain",
columns: {
idColumnName: "id",
vectorColumnName: "vector",
contentColumnName: "content",
metadataColumnName: "metadata",
},
};
const vectorStore = await PGVectorStore.initialize(
new OpenAIEmbeddings(),
config
);
// Create a new record manager
const recordManagerConfig = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "upsertion_records",
};
const recordManager = new PostgresRecordManager(
"test_namespace",
recordManagerConfig
);
// Create the schema if it doesn't exist
await recordManager.createSchema();
// Index some documents
const doc1 = {
pageContent: "kitty",
metadata: { source: "kitty.txt" },
};
const doc2 = {
pageContent: "doggy",
metadata: { source: "doggy.txt" },
};
/**
* Hacky helper method to clear content. See the `full` mode section to to understand why it works.
*/
async function clear() {
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
});
}
// No cleanup
await clear();
// This mode does not do automatic clean up of old versions of content; however, it still takes care of content de-duplication.
console.log(
await index({
docsSource: [doc1, doc1, doc1, doc1, doc1, doc1],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
await clear();
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Second time around all content will be skipped
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/
// Updated content will be added, but old won't be deleted
const doc1Updated = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [doc1Updated, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 1,
}
*/
/*
Resulting records in the database:
[
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
{
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
}
]
*/
// Incremental mode
await clear();
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Indexing again should result in both documents getting skipped – also skipping the embedding operation!
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/
// If we provide no documents with incremental indexing mode, nothing will change.
console.log(
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// If we mutate a document, the new version will be written and all old versions sharing the same source will be deleted.
// This only affects the documents with the same source id!
const changedDoc1 = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [changedDoc1],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 1,
numSkipped: 0,
}
*/
// Full mode
await clear();
// In full mode the user should pass the full universe of content that should be indexed into the indexing function.
// Any documents that are not passed into the indexing function and are present in the vectorStore will be deleted!
// This behavior is useful to handle deletions of source documents.
const allDocs = [doc1, doc2];
console.log(
await index({
docsSource: allDocs,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Say someone deleted the first doc:
const doc2Only = [doc2];
// Using full mode will clean up the deleted content as well.
// This afffects all documents regardless of source id!
console.log(
await index({
docsSource: doc2Only,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 1,
numSkipped: 1,
}
*/
await clear();
const newDoc1 = {
pageContent: "kitty kitty kitty kitty kitty",
metadata: { source: "kitty.txt" },
};
const newDoc2 = {
pageContent: "doggy doggy the doggy",
metadata: { source: "doggy.txt" },
};
const splitter = new CharacterTextSplitter({
separator: "t",
keepSeparator: true,
chunkSize: 12,
chunkOverlap: 2,
});
const newDocs = await splitter.splitDocuments([newDoc1, newDoc2]);
console.log(newDocs);
/*
[
{
pageContent: 'kitty kit',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty ki',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty',
metadata: {source: 'kitty.txt'},
},
{
pageContent: 'doggy doggy',
metadata: {source: 'doggy.txt'},
{
pageContent: 'the doggy',
metadata: {source: 'doggy.txt'},
}
]
*/
console.log(
await index({
docsSource: newDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 5,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
const changedDoggyDocs = [
{
pageContent: "woof woof",
metadata: { source: "doggy.txt" },
},
{
pageContent: "woof woof woof",
metadata: { source: "doggy.txt" },
},
];
console.log(
await index({
docsSource: changedDoggyDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 2,
numSkipped: 0,
}
*/
// Usage with document loaders
// Create a document loader
class MyCustomDocumentLoader extends BaseDocumentLoader {
load() {
return Promise.resolve([
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
]);
}
}
await clear();
const loader = new MyCustomDocumentLoader();
console.log(
await index({
docsSource: loader,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Closing resources
await recordManager.end();
await vectorStore.end();
API 参考
- PostgresRecordManager 来自
@langchain/community/indexes/postgres
- 索引 来自
langchain/indexes
- PGVectorStore 来自
@langchain/community/vectorstores/pgvector
- OpenAIEmbeddings 来自
@langchain/openai
- CharacterTextSplitter 来自
@langchain/textsplitters
- BaseDocumentLoader 来自
@langchain/core/document_loaders/base
下一步
您现在已经了解了如何在您的 RAG 管道中使用索引。
接下来,查看有关检索的其他部分。