跳至主要内容

如何重新索引数据以使您的向量数据库与底层数据源保持同步

先决条件

本指南假设您熟悉以下概念

在这里,我们将介绍使用 LangChain 索引 API 的基本索引工作流程。

索引 API 允许您从任何来源加载文档并将其与向量数据库保持同步。具体而言,它有助于

  • 避免将重复的内容写入向量数据库
  • 避免重新写入未更改的内容
  • 避免对未更改的内容重新计算嵌入

所有这些都应该为您节省时间和金钱,并提高您的向量搜索结果。

重要的是,索引 API 即使对于经过多个转换步骤(例如,通过文本分块)处理的文档(相对于原始源文档而言)也适用。

工作原理

LangChain 索引使用记录管理器 (RecordManager) 来跟踪写入向量数据库的文档。

在索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中

  • 文档哈希值(页面内容和元数据的哈希值)
  • 写入时间
  • 源 ID - 每个文档的元数据中应包含信息,以便我们能够确定此文档的最终来源

删除模式

在将文档索引到向量数据库时,可能需要删除向量数据库中的一些现有文档。在某些情况下,您可能希望删除所有来自与新索引的文档相同来源的现有文档。在其他情况下,您可能希望完全删除所有现有文档。索引 API 删除模式允许您选择所需的模式

清理模式去重内容可并行化清理已删除的源文档清理源文档和/或派生文档的变动清理时间
-
增量连续
完整在索引结束时

None 不会执行任何自动清理,允许用户手动清理旧内容。

incrementalfull 提供以下自动清理

  • 如果源文档或派生文档的内容已更改,incrementalfull 模式都将清理(删除)以前版本的该内容。
  • 如果源文档已被删除(这意味着它未包含在当前正在索引的文档中),则完整清理模式将正确地将其从向量数据库中删除,但 incremental 模式不会。

当内容发生变动(例如,源 PDF 文件已修改)时,在索引过程中会有一段时间,新旧版本都可能会返回给用户。这发生在新内容写入后,但在旧版本删除之前。

  • incremental 索引将这段时间缩短到最少,因为它能够在写入时连续清理。
  • full 模式在所有批次写入后执行清理。

要求

  1. 请勿与独立于索引 API 预先填充内容的存储一起使用,因为记录管理器将不知道以前已插入记录。
  2. 仅适用于支持以下功能的 LangChain vectorstore:a). 按 ID 添加文档(addDocuments 方法,包含 ids 参数) b). 按 ID 删除(delete 方法,包含 ids 参数)

兼容的向量存储:PGVectorChromaCloudflareVectorizeElasticVectorSearchFAISSMomentoVectorIndexPineconeSupabaseVectorStoreVercelPostgresVectorStoreWeaviateXata

注意

记录管理器依赖于基于时间机制来确定哪些内容可以被清理(在使用fullincremental清理模式时)。

如果两个任务背靠背运行,并且第一个任务在时钟时间更改之前完成,那么第二个任务可能无法清理内容。

由于以下原因,这在实际设置中不太可能成为问题

  1. RecordManager 使用更高分辨率的时间戳。
  2. 数据需要在第一个和第二个任务运行之间发生变化,如果任务之间的时间间隔很小,这不太可能发生。
  3. 索引任务通常需要超过几毫秒。

快速入门

import { PostgresRecordManager } from "@langchain/community/indexes/postgres";
import { index } from "langchain/indexes";
import { PGVectorStore } from "@langchain/community/vectorstores/pgvector";
import { PoolConfig } from "pg";
import { OpenAIEmbeddings } from "@langchain/openai";
import { CharacterTextSplitter } from "@langchain/textsplitters";
import { BaseDocumentLoader } from "@langchain/core/document_loaders/base";

// First, follow set-up instructions at
// https://js.langchain.ac.cn/docs/modules/indexes/vector_stores/integrations/pgvector

const config = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "testlangchain",
columns: {
idColumnName: "id",
vectorColumnName: "vector",
contentColumnName: "content",
metadataColumnName: "metadata",
},
};

const vectorStore = await PGVectorStore.initialize(
new OpenAIEmbeddings(),
config
);

// Create a new record manager
const recordManagerConfig = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "upsertion_records",
};
const recordManager = new PostgresRecordManager(
"test_namespace",
recordManagerConfig
);

// Create the schema if it doesn't exist
await recordManager.createSchema();

// Index some documents
const doc1 = {
pageContent: "kitty",
metadata: { source: "kitty.txt" },
};

const doc2 = {
pageContent: "doggy",
metadata: { source: "doggy.txt" },
};

/**
* Hacky helper method to clear content. See the `full` mode section to to understand why it works.
*/
async function clear() {
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
});
}

// No cleanup
await clear();
// This mode does not do automatic clean up of old versions of content; however, it still takes care of content de-duplication.

console.log(
await index({
docsSource: [doc1, doc1, doc1, doc1, doc1, doc1],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

await clear();

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Second time around all content will be skipped

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/

// Updated content will be added, but old won't be deleted

const doc1Updated = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};

console.log(
await index({
docsSource: [doc1Updated, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 1,
}
*/

/*
Resulting records in the database:
[
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
{
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
}
]
*/

// Incremental mode
await clear();

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Indexing again should result in both documents getting skipped – also skipping the embedding operation!

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/

// If we provide no documents with incremental indexing mode, nothing will change.
console.log(
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// If we mutate a document, the new version will be written and all old versions sharing the same source will be deleted.
// This only affects the documents with the same source id!

const changedDoc1 = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [changedDoc1],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 1,
numSkipped: 0,
}
*/

// Full mode
await clear();
// In full mode the user should pass the full universe of content that should be indexed into the indexing function.

// Any documents that are not passed into the indexing function and are present in the vectorStore will be deleted!

// This behavior is useful to handle deletions of source documents.
const allDocs = [doc1, doc2];
console.log(
await index({
docsSource: allDocs,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Say someone deleted the first doc:

const doc2Only = [doc2];

// Using full mode will clean up the deleted content as well.
// This afffects all documents regardless of source id!

console.log(
await index({
docsSource: doc2Only,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 1,
numSkipped: 1,
}
*/

await clear();

const newDoc1 = {
pageContent: "kitty kitty kitty kitty kitty",
metadata: { source: "kitty.txt" },
};

const newDoc2 = {
pageContent: "doggy doggy the doggy",
metadata: { source: "doggy.txt" },
};

const splitter = new CharacterTextSplitter({
separator: "t",
keepSeparator: true,
chunkSize: 12,
chunkOverlap: 2,
});

const newDocs = await splitter.splitDocuments([newDoc1, newDoc2]);
console.log(newDocs);
/*
[
{
pageContent: 'kitty kit',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty ki',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty',
metadata: {source: 'kitty.txt'},
},
{
pageContent: 'doggy doggy',
metadata: {source: 'doggy.txt'},
{
pageContent: 'the doggy',
metadata: {source: 'doggy.txt'},
}
]
*/

console.log(
await index({
docsSource: newDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 5,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

const changedDoggyDocs = [
{
pageContent: "woof woof",
metadata: { source: "doggy.txt" },
},
{
pageContent: "woof woof woof",
metadata: { source: "doggy.txt" },
},
];

console.log(
await index({
docsSource: changedDoggyDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 2,
numSkipped: 0,
}
*/

// Usage with document loaders

// Create a document loader
class MyCustomDocumentLoader extends BaseDocumentLoader {
load() {
return Promise.resolve([
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
]);
}
}

await clear();

const loader = new MyCustomDocumentLoader();

console.log(
await index({
docsSource: loader,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Closing resources
await recordManager.end();
await vectorStore.end();

API 参考

下一步

您现在已经了解了如何在您的 RAG 管道中使用索引。

接下来,查看有关检索的其他部分。


此页面是否有帮助?


您也可以留下详细的反馈 在 GitHub 上.