
类型:数据库
简介:实时且性能出色的向量数据库,专门针对大规模向量搜索进行优化。
本篇中将更详细地探讨视频搜索应用的数据摄取管道(ingestion pipeline) 架构。我们会逐一分析每个微服务的功能,以及它们是如何协同工作的。
一、整体架构回顾
本篇重点关注 摄取服务,以及它们如何协作生成可搜索的视频应用。与许多 AI 系统类似,我们的管道中包含一些计算密集型的预处理步骤,同时 AI 功能本身也需要大量计算,因此在设计系统架构时必须考虑这些因素。
管道的架构目标是:随着数据量增加,系统可以轻松扩展,同时整体处理时间不会大幅增长。我们主要通过两种方式实现这一目标:
- 将原始数据拆分为更小的工作单元
- 在多个工作节点之间并行计算
正如前文所述,为了系统正常运行,每个工作节点必须是 无状态(stateless) 的。原因有三个:
- 确保多个客户端或消费者可以从每个工作节点获得一致的行为。
- 保证某个节点失败时,其他节点可以接管任务而不影响整体流程,从而提高 高可用性(high availability) 和 容错性(fault tolerance)。
- 简化扩展:无状态节点可以根据负载增加轻松在多台机器上复制,而不需要维护复杂状态。
虽然每个节点是无状态的,但我们仍需要一种方式来存储和传递数据。为此,我们使用 S3 作为分布式数据存储,并使用 Kafka 传递数据处理信息。注意,我们并不直接通过 Kafka 传输数据本身,而是传递存储在 S3 的数据引用。
二、Kafka 与并行处理
Kafka 是一个分布式流处理平台,设计用于高吞吐、低延迟的实时数据处理。它基于 发布-订阅(publish-subscribe) 模型:生产者发送数据到主题(topic),消费者订阅主题并处理数据。
Kafka 的强大之处在于它的 并行化能力。每个主题可以包含多个 分区(partition),每个分区在同一时间只能被一个消费者读取。这样,如果分区数量与消费者数量相等,每个消费者恰好读取一个分区,从而充分利用处理能力。
在本应用中,我们使用了两个 Kafka 主题:
- video-segments:视频被拆分成小段后,每段发送到该主题。帧提取器(Frame Extractor)订阅此主题,每个实例从中取出一个视频段进行帧提取。
- frames:每帧保存到 S3 后,会发送到此主题。订阅的 Indexer 消费者处理这些帧,实现对象检测、生成嵌入向量(embedding)并存入索引。
通过这种方式,我们可以将处理任务分配给多个消费者,实现 并行化处理,提升处理效率。
三、视频拆分器(Video Splitter)
视频拆分器负责下载视频、将视频拆分为多个小段,并上传到 AWS S3。它使用 fluent-ffmpeg 处理视频任务,如拆分视频段。
主要流程如下:
- 下载视频:使用 ytdl-core 下载视频并保存为本地 MP4 文件。
- 拆分视频:同一函数通过 fluent-ffmpeg 按
chunkDuration将视频拆分成多个片段。 - 上传到 S3:每个视频段通过 @aws-sdk/client-s3 上传至 S3。
- 发送 Kafka 消息:上传完成后,将视频段信息发送到 video-segments 主题,用于下游帧提取器消费。
视频拆分器代码示例:
const split = async (videoPath, name, fps, chunkDuration, videoLimit) => {
const outputFolder = join(__dirname, `temp_files/${name}`);
await fsPromises.mkdir(outputFolder, { recursive: true });
const duration = await videoDuration(videoPath);
const videoDurationsInSeconds = Math.min(duration, videoLimit);
const numberOfChunks = Math.ceil(videoDurationsInSeconds / chunkDuration);
await new Promise((resolve, reject) => {
ffmpeg(videoPath)
.outputOptions([
"-c copy",
"-map 0",
`-segment_time ${chunkDuration}`,
"-f segment",
"-reset_timestamps 1",
"-segment_start_number 0",
"-copyinkf",
])
.output(join(outputFolder, "part_%d.mp4"))
.on("end", resolve)
.on("error", reject)
.run();
});
const videoOutputs = [];
for (let i = 0; i < numberOfChunks; i++) {
videoOutputs.push({
videoPath: join(outputFolder, `part_${i}.mp4`),
index: i,
});
}
return videoOutputs;
};
四、帧提取器(Frame Extractor)
帧提取器会处理视频段,提取每一帧,并上传至 S3,同时保存帧的引用信息到 Redis,供索引器使用。
- 提取帧:使用 extractFrames 函数将视频段保存为 PNG 文件。
- 上传 S3:每帧上传 S3 后,通过 Kafka 将帧路径发送到 frames 主题,实现下游处理的并行化。
- 日志记录:每处理完一帧,会调用 log 函数记录帧提取进度。
五、索引器(Indexer)
索引器是管道的核心,负责:
- 从 S3 获取帧
- 使用 detr-resnet-50 模型进行对象检测
- 根据边界框(Bounding Boxes)裁剪图像,并保存到 S3 和 Redis
- 使用 clip-vit-large-patch14 生成图像嵌入向量,并存入 Pinecone 数据库
- 整个流程支持多实例并行处理,提高吞吐量
索引器代码示例:
const indexImages = async ({ name, limit, filesList }) => {
const client = await getAwsS3Client();
let list = [];
if (!filesList && name) {
const files = await client.send(new ListObjectsV2Command({
Bucket: AWS_S3_BUCKET,
Prefix: `${name}/frame`,
}));
list = files.Contents ? files.Contents.map(x => x.Key) : [];
if (limit) list = list.slice(0, limit);
} else if (!filesList) {
throw new Error("No files list provided");
} else {
list = filesList;
}
const tasks = list.map(async (fileName) => {
try {
const segmentedFiles = (await segmentImage(fileName))?.filter(x => x) || [];
await embedAndUpsert({ imagePaths: segmentedFiles, chunkSize: 100 });
await log(`Done indexing ${fileName}`);
await completeFile(fileName);
} catch (error) {
console.error(`Error processing file ${fileName}: ${error}`);
}
});
await Promise.all(tasks);
};
六、集中日志记录(Centralized Logging)
为了方便监控系统整体运行情况,我们在 app-backend 中设置了日志接口:
const log = async (message, payload = {}) => {
const podId = process.env.POD_NAME || "unknown";
const formattedMessage = `${podId}: ${message}`;
try {
await fetch(`http://${BACKEND}/log`, {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({ message: formattedMessage, payload }),
});
} catch (error) {
console.error("Failed to log message", error);
}
};
同时使用 trackFile 方法追踪每个服务处理的文件数量,使前端可以获取完整的处理进度。
总结
本文详细解析了摄取管道的架构与功能,重点讲解了视频拆分器、帧提取器和索引器,并展示了它们如何与 Kafka、S3 和 Pinecone 协同工作。通过这种设计,系统在处理海量视频数据时仍能保持高可用性、容错性和可扩展性。
下一篇中,我们将关注应用的消费端,了解前端如何查询和展示数据。

