Pinecone+Kubernetes扩展AI应用

2026-05-14 11
Pinecone

类型:数据库

简介:实时且性能出色的向量数据库,专门针对大规模向量搜索进行优化。

本篇中将更详细地探讨视频搜索应用的数据摄取管道(ingestion pipeline) 架构。我们会逐一分析每个微服务的功能,以及它们是如何协同工作的。

一、整体架构回顾

本篇重点关注 摄取服务,以及它们如何协作生成可搜索的视频应用。与许多 AI 系统类似,我们的管道中包含一些计算密集型的预处理步骤,同时 AI 功能本身也需要大量计算,因此在设计系统架构时必须考虑这些因素。

管道的架构目标是:随着数据量增加,系统可以轻松扩展,同时整体处理时间不会大幅增长。我们主要通过两种方式实现这一目标:

  • 将原始数据拆分为更小的工作单元
  • 在多个工作节点之间并行计算

正如前文所述,为了系统正常运行,每个工作节点必须是 无状态(stateless) 的。原因有三个:

  1. 确保多个客户端或消费者可以从每个工作节点获得一致的行为。
  2. 保证某个节点失败时,其他节点可以接管任务而不影响整体流程,从而提高 高可用性(high availability)容错性(fault tolerance)
  3. 简化扩展:无状态节点可以根据负载增加轻松在多台机器上复制,而不需要维护复杂状态。

虽然每个节点是无状态的,但我们仍需要一种方式来存储和传递数据。为此,我们使用 S3 作为分布式数据存储,并使用 Kafka 传递数据处理信息。注意,我们并不直接通过 Kafka 传输数据本身,而是传递存储在 S3 的数据引用。

二、Kafka 与并行处理

Kafka 是一个分布式流处理平台,设计用于高吞吐、低延迟的实时数据处理。它基于 发布-订阅(publish-subscribe) 模型:生产者发送数据到主题(topic),消费者订阅主题并处理数据。

Kafka 的强大之处在于它的 并行化能力。每个主题可以包含多个 分区(partition),每个分区在同一时间只能被一个消费者读取。这样,如果分区数量与消费者数量相等,每个消费者恰好读取一个分区,从而充分利用处理能力。

在本应用中,我们使用了两个 Kafka 主题:

  • video-segments:视频被拆分成小段后,每段发送到该主题。帧提取器(Frame Extractor)订阅此主题,每个实例从中取出一个视频段进行帧提取。
  • frames:每帧保存到 S3 后,会发送到此主题。订阅的 Indexer 消费者处理这些帧,实现对象检测、生成嵌入向量(embedding)并存入索引。

通过这种方式,我们可以将处理任务分配给多个消费者,实现 并行化处理,提升处理效率。

三、视频拆分器(Video Splitter)

视频拆分器负责下载视频、将视频拆分为多个小段,并上传到 AWS S3。它使用 fluent-ffmpeg 处理视频任务,如拆分视频段。

主要流程如下:

  1. 下载视频:使用 ytdl-core 下载视频并保存为本地 MP4 文件。
  2. 拆分视频:同一函数通过 fluent-ffmpegchunkDuration 将视频拆分成多个片段。
  3. 上传到 S3:每个视频段通过 @aws-sdk/client-s3 上传至 S3。
  4. 发送 Kafka 消息:上传完成后,将视频段信息发送到 video-segments 主题,用于下游帧提取器消费。

视频拆分器代码示例:


const split = async (videoPath, name, fps, chunkDuration, videoLimit) => {
  const outputFolder = join(__dirname, `temp_files/${name}`);
  await fsPromises.mkdir(outputFolder, { recursive: true });
  const duration = await videoDuration(videoPath);
  const videoDurationsInSeconds = Math.min(duration, videoLimit);
  const numberOfChunks = Math.ceil(videoDurationsInSeconds / chunkDuration);

  await new Promise((resolve, reject) => {
    ffmpeg(videoPath)
      .outputOptions([
        "-c copy",
        "-map 0",
        `-segment_time ${chunkDuration}`,
        "-f segment",
        "-reset_timestamps 1",
        "-segment_start_number 0",
        "-copyinkf",
      ])
      .output(join(outputFolder, "part_%d.mp4"))
      .on("end", resolve)
      .on("error", reject)
      .run();
  });

  const videoOutputs = [];
  for (let i = 0; i < numberOfChunks; i++) {
    videoOutputs.push({
      videoPath: join(outputFolder, `part_${i}.mp4`),
      index: i,
    });
  }
  return videoOutputs;
};

四、帧提取器(Frame Extractor)

帧提取器会处理视频段,提取每一帧,并上传至 S3,同时保存帧的引用信息到 Redis,供索引器使用。

  • 提取帧:使用 extractFrames 函数将视频段保存为 PNG 文件。
  • 上传 S3:每帧上传 S3 后,通过 Kafka 将帧路径发送到 frames 主题,实现下游处理的并行化。
  • 日志记录:每处理完一帧,会调用 log 函数记录帧提取进度。

五、索引器(Indexer)

索引器是管道的核心,负责:

  • 从 S3 获取帧
  • 使用 detr-resnet-50 模型进行对象检测
  • 根据边界框(Bounding Boxes)裁剪图像,并保存到 S3 和 Redis
  • 使用 clip-vit-large-patch14 生成图像嵌入向量,并存入 Pinecone 数据库
  • 整个流程支持多实例并行处理,提高吞吐量

索引器代码示例:


const indexImages = async ({ name, limit, filesList }) => {
  const client = await getAwsS3Client();
  let list = [];

  if (!filesList && name) {
    const files = await client.send(new ListObjectsV2Command({
      Bucket: AWS_S3_BUCKET,
      Prefix: `${name}/frame`,
    }));
    list = files.Contents ? files.Contents.map(x => x.Key) : [];
    if (limit) list = list.slice(0, limit);
  } else if (!filesList) {
    throw new Error("No files list provided");
  } else {
    list = filesList;
  }

  const tasks = list.map(async (fileName) => {
    try {
      const segmentedFiles = (await segmentImage(fileName))?.filter(x => x) || [];
      await embedAndUpsert({ imagePaths: segmentedFiles, chunkSize: 100 });
      await log(`Done indexing ${fileName}`);
      await completeFile(fileName);
    } catch (error) {
      console.error(`Error processing file ${fileName}: ${error}`);
    }
  });

  await Promise.all(tasks);
};

六、集中日志记录(Centralized Logging)

为了方便监控系统整体运行情况,我们在 app-backend 中设置了日志接口:


const log = async (message, payload = {}) => {
  const podId = process.env.POD_NAME || "unknown";
  const formattedMessage = `${podId}: ${message}`;
  try {
    await fetch(`http://${BACKEND}/log`, {
      method: "POST",
      headers: {"Content-Type": "application/json"},
      body: JSON.stringify({ message: formattedMessage, payload }),
    });
  } catch (error) {
    console.error("Failed to log message", error);
  }
};

同时使用 trackFile 方法追踪每个服务处理的文件数量,使前端可以获取完整的处理进度。

总结

本文详细解析了摄取管道的架构与功能,重点讲解了视频拆分器、帧提取器和索引器,并展示了它们如何与 Kafka、S3 和 Pinecone 协同工作。通过这种设计,系统在处理海量视频数据时仍能保持高可用性、容错性和可扩展性。

下一篇中,我们将关注应用的消费端,了解前端如何查询和展示数据。

  • 广告合作

  • QQ群号:4114653

温馨提示:
1、本网站发布的内容(图片、视频和文字)以原创、转载和分享网络内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。邮箱:2942802716#qq.com(#改为@)。 2、本站原创内容未经允许不得转裁,转载请注明出处“站长百科”和原文地址。