如何重新索引数据以保持你的向量数据库与底层数据源同步
本指南假设你熟悉以下概念
在这里,我们将使用 LangChain 索引 API 了解基本的索引工作流程。
索引 API 允许你加载和同步来自任何来源的文档到向量存储中。具体来说,它有助于
- 避免将重复内容写入向量存储
- 避免重新写入未更改的内容
- 避免在未更改的内容上重新计算嵌入
所有这些都应该节省你的时间和金钱,并提高你的向量搜索结果。
重要的是,即使文档相对于原始源文档经历了多个转换步骤(例如,通过文本分块),索引 API 也能正常工作。
工作原理
LangChain 索引使用记录管理器 (RecordManager
) 来跟踪写入向量存储的文档。
索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中
- 文档哈希值(页面内容和元数据的哈希值)
- 写入时间
- 源 ID - 每个文档的元数据中都应包含信息,以便我们确定此文档的最终来源
删除模式
将文档索引到向量存储中时,向量存储中可能存在一些现有的文档应该被删除。在某些情况下,你可能想要删除所有来自与新索引文档相同来源的现有文档。在其他情况下,你可能想要彻底删除所有现有文档。索引 API 删除模式允许你选择想要的行为
清理模式 | 消除重复内容 | 可并行化 | 清理已删除的源文档 | 清理源文档和/或派生文档的变动 | 清理时间 |
---|---|---|---|---|---|
无 | ✅ | ✅ | ❌ | ❌ | - |
增量 | ✅ | ✅ | ❌ | ✅ | 持续 |
完整 | ✅ | ❌ | ✅ | ✅ | 在索引结束时 |
无
不执行任何自动清理,允许用户手动清理旧内容。
增量
和 完整
提供以下自动清理
- 如果源文档或派生文档的内容发生了更改,
增量
或完整
模式都将清理(删除)内容的先前版本。 - 如果源文档已被删除(这意味着它不包含在当前正在索引的文档中),则完整清理模式将从向量存储中正确删除它,但
增量
模式不会。
当内容发生变动时(例如,源 PDF 文件被修订),在索引期间将有一段时间,新旧版本都可能被返回给用户。这发生在新内容被写入后,但在旧版本被删除之前。
增量
索引将这段时间最小化,因为它能够在写入时持续进行清理。完整
模式在所有批次写入后进行清理。
要求
- 不要与独立于索引 API 预先填充内容的存储一起使用,因为记录管理器将不知道之前已插入记录。
- 仅适用于支持以下内容的 LangChain
向量存储
:a). 按 ID 添加文档(addDocuments
方法,带有 id 参数)b). 按 ID 删除(delete 方法,带有 id 参数)
兼容的向量存储:PGVector
、Chroma
、CloudflareVectorize
、ElasticVectorSearch
、FAISS
、MomentoVectorIndex
、Pinecone
、SupabaseVectorStore
、VercelPostgresVectorStore
、Weaviate
、Xata
注意
记录管理器依赖于基于时间的机制来确定可以清理哪些内容(当使用 full
或 incremental
清理模式时)。
如果两个任务背靠背运行,并且第一个任务在时钟时间更改之前完成,则第二个任务可能无法清理内容。
由于以下原因,这在实际设置中不太可能成为问题
RecordManager
使用更高分辨率的时间戳。- 数据需要在第一个和第二个任务运行之间发生变化,如果任务之间的时间间隔很小,这不太可能发生。
- 索引任务通常需要超过几毫秒。
快速入门
import { PostgresRecordManager } from "@langchain/community/indexes/postgres";
import { index } from "langchain/indexes";
import { PGVectorStore } from "@langchain/community/vectorstores/pgvector";
import { PoolConfig } from "pg";
import { OpenAIEmbeddings } from "@langchain/openai";
import { CharacterTextSplitter } from "@langchain/textsplitters";
import { BaseDocumentLoader } from "@langchain/core/document_loaders/base";
// First, follow set-up instructions at
// https://js.langchain.ac.cn/docs/modules/indexes/vector_stores/integrations/pgvector
const config = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "testlangchain",
columns: {
idColumnName: "id",
vectorColumnName: "vector",
contentColumnName: "content",
metadataColumnName: "metadata",
},
};
const vectorStore = await PGVectorStore.initialize(
new OpenAIEmbeddings(),
config
);
// Create a new record manager
const recordManagerConfig = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "upsertion_records",
};
const recordManager = new PostgresRecordManager(
"test_namespace",
recordManagerConfig
);
// Create the schema if it doesn't exist
await recordManager.createSchema();
// Index some documents
const doc1 = {
pageContent: "kitty",
metadata: { source: "kitty.txt" },
};
const doc2 = {
pageContent: "doggy",
metadata: { source: "doggy.txt" },
};
/**
* Hacky helper method to clear content. See the `full` mode section to to understand why it works.
*/
async function clear() {
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
});
}
// No cleanup
await clear();
// This mode does not do automatic clean up of old versions of content; however, it still takes care of content de-duplication.
console.log(
await index({
docsSource: [doc1, doc1, doc1, doc1, doc1, doc1],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
await clear();
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Second time around all content will be skipped
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/
// Updated content will be added, but old won't be deleted
const doc1Updated = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [doc1Updated, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 1,
}
*/
/*
Resulting records in the database:
[
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
{
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
}
]
*/
// Incremental mode
await clear();
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Indexing again should result in both documents getting skipped – also skipping the embedding operation!
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/
// If we provide no documents with incremental indexing mode, nothing will change.
console.log(
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// If we mutate a document, the new version will be written and all old versions sharing the same source will be deleted.
// This only affects the documents with the same source id!
const changedDoc1 = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [changedDoc1],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 1,
numSkipped: 0,
}
*/
// Full mode
await clear();
// In full mode the user should pass the full universe of content that should be indexed into the indexing function.
// Any documents that are not passed into the indexing function and are present in the vectorStore will be deleted!
// This behavior is useful to handle deletions of source documents.
const allDocs = [doc1, doc2];
console.log(
await index({
docsSource: allDocs,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Say someone deleted the first doc:
const doc2Only = [doc2];
// Using full mode will clean up the deleted content as well.
// This afffects all documents regardless of source id!
console.log(
await index({
docsSource: doc2Only,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 1,
numSkipped: 1,
}
*/
await clear();
const newDoc1 = {
pageContent: "kitty kitty kitty kitty kitty",
metadata: { source: "kitty.txt" },
};
const newDoc2 = {
pageContent: "doggy doggy the doggy",
metadata: { source: "doggy.txt" },
};
const splitter = new CharacterTextSplitter({
separator: "t",
keepSeparator: true,
chunkSize: 12,
chunkOverlap: 2,
});
const newDocs = await splitter.splitDocuments([newDoc1, newDoc2]);
console.log(newDocs);
/*
[
{
pageContent: 'kitty kit',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty ki',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty',
metadata: {source: 'kitty.txt'},
},
{
pageContent: 'doggy doggy',
metadata: {source: 'doggy.txt'},
{
pageContent: 'the doggy',
metadata: {source: 'doggy.txt'},
}
]
*/
console.log(
await index({
docsSource: newDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 5,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
const changedDoggyDocs = [
{
pageContent: "woof woof",
metadata: { source: "doggy.txt" },
},
{
pageContent: "woof woof woof",
metadata: { source: "doggy.txt" },
},
];
console.log(
await index({
docsSource: changedDoggyDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 2,
numSkipped: 0,
}
*/
// Usage with document loaders
// Create a document loader
class MyCustomDocumentLoader extends BaseDocumentLoader {
load() {
return Promise.resolve([
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
]);
}
}
await clear();
const loader = new MyCustomDocumentLoader();
console.log(
await index({
docsSource: loader,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Closing resources
await recordManager.end();
await vectorStore.end();
API 参考
- PostgresRecordManager 来自
@langchain/community/indexes/postgres
- index 来自
langchain/indexes
- PGVectorStore 来自
@langchain/community/vectorstores/pgvector
- OpenAIEmbeddings 来自
@langchain/openai
- CharacterTextSplitter 来自
@langchain/textsplitters
- BaseDocumentLoader 来自
@langchain/core/document_loaders/base
下一步
您现在已经了解了如何在 RAG 管道中使用索引。
接下来,查看有关检索的其他部分。