协调器

欢迎来到 dora 教程的最后一章!在本教程中,我们学习了数据流节点操作符 ,以及它们如何通过事件流进行通信并高效地传输数据消息/Arrow Data。 我们还了解了如何使用 API 绑定编写逻辑,以及如何使用 Dora CLI 作为命令行界面。最近,在 Dora 守护进程 中,我们学习了 Dora 守护进程,它是在单台机器上本地管理 dora 组件的后台服务。

但是,当你的应用程序变得更大,需要使用分布在多台计算机上的传感器、处理器或硬件时,会发生什么情况呢?例如,你可能会遇到:

  • 一个机器人上的一个摄像头。
  • 在强大的台式计算机上运行的重型 AI 模型。
  • 另一块硬件上的电机控制器。
  • 笔记本电脑上的可视化显示。

您的 Dataflow 需要跨越所有这些机器。您将在每台机器上运行一个 Dora Daemon 来管理本地节点 。但是,如何让系统在正确的机器上启动正确的节点 ,协调它们之间通过网络的通信,并管理这个分布式应用程序的整个生命周期呢?

这就是 Dora Coordinator 协调器 的工作。

空中交通管制塔

可以将 Dora 协调器 想象成整个 dora 系统的中央空中交通管制塔 ,尤其是在数据流分布于多台机器的情况下。每个 Dora 守护进程都是其本地工厂车间的“领班”(管理一台机器上的节点),而协调器则是高级管理者,负责监督所有不同的工厂车间,并确保所有设备和谐地协同工作。

其主要职责是:

  1. 接收命令: 它从 Dora CLI 接收有关整个数据流的指令(例如“启动此数据流”、“停止该数据流”)。
  2. 了解景观: 它跟踪不同机器上可用的 Dora Daemons
  3. 数据流编排: 它读取数据流 YAML ,了解哪些节点需要在哪台机器上运行(如果指定),并告诉适当的 Dora 守护进程启动这些节点 。
  4. 管理生命周期: 它管理数据流的整体状态(运行、停止等)并协调跨多台机器停止或重新加载节点等命令。
  5. 协调通信(高级): 守护进程处理本地数据传输(如共享内存 )时,协调器确保底层传输(如 Zenoh,通常用于分布式设置)配置正确,以便数据可以按照数据流蓝图在不同机器上的守护进程之间流动。

本质上,协调器是连接用户(通过 CLI)和守护进程和节点的分布式集合的大脑。

将协调器与分布式数据流结合使用

当你使用 dora run yolo.yml 在一台机器上本地运行数据流时,CLI 可能会直接与本地 Dora Daemon 通信,或者它可能会自动启动并与本地 Coordinator 进程通信,然后由该进程管理本地 Daemon。对于在单机上运行的初学者来说,Coordinator 的存在可能一开始不太明显。

然而,当你定义一个跨多台机器的数据流时,协调器就变得至关重要,并且需要明确指定。你可以使用数据流 YAML 文件中的 _unstable_deploy: machine 字段来告诉 dora 每个节点应该在哪里运行。

以下是分布式数据流的简化示例 YAML

nodes:
  # 机器 A 上的摄像头节点(例如,机器人)
  - id: camera-robot
    path: camera-node-code # 代码在机器人上运行
    outputs:
      - image
    _unstable_deploy: # 告诉 Dora 将此节点部署到哪里
      machine: robot-machine-A # 需要在机器“robot-machine-A”上运行守护进程

  # 机器 B 上的对象检测节点(例如,一台功能强大的 PC)
  - id: object-detector-pc
    path: yolo-node-code # 代码在PC上运行
    inputs:
      image: camera-robot/image # 接收来自机器人A的图像
    outputs:
      - bbox
    _unstable_deploy:
      machine: desktop-machine-B # 需要在机器“desktop-machine-B”上运行守护进程

  # 在机器 C 上绘图的节点(例如笔记本电脑)
  - id: plot-laptop
    path: plot-node-code # 代码在笔记本电脑上运行
    inputs:
      image: camera-robot/image # 接收来自机器人A的 image
      boxes2d: object-detector-pc/bbox # 从桌面计算机 B 接收 bbox
    _unstable_deploy:
      machine: laptop-machine-C # 需要在机器“laptop-machine-C”上运行守护进程

在此设置中,您将拥有:

  • robot-machine-A 上运行的 Dora Daemon
  • desktop-machine-B 上运行的 Dora Daemon
  • laptop-machine-C 上运行的 Dora Daemon
  • 至关重要的是,单个 Dora Coordinator 进程在所有这些机器可以访问的地方运行(它甚至可以在机器人、PC 或笔记本电脑之一上,或单独的服务器上)。

要运行这个分布式数据流,您可以使用 Dora CLI 并告诉它协调器的位置,通常使用 --coordinator-addr 标志:

# 假设协调器在 IP 地址 192.168.1.100 上运行
dora run distributed_dataflow.yml --coordinator-addr 192.168.1.100 

执行此命令时:

  1. Dora CLI 程序启动。
  2. 它连接到在 192.168.1.100 运行的 Dora Coordinator 进程。
  3. 它将 distributed_dataflow.yml 内容发送给协调器。
  4. 协调器读取 YAML 并查看哪些节点分配给哪些机器( robot-machine-Adesktop-machine-Blaptop-machine-C )。
  5. 协调器识别与这些机器 ID 关联的 Dora 守护进程 (守护进程在启动时向协调器注册)。
  6. 协调器向 robot-machine-A 上的守护进程发送指令以启动 camera-robot 节点。
  7. 协调器向 desktop-machine-B 上的守护进程发送指令以启动 object-detector-pc 节点。
  8. 协调器向 laptop-machine-C 上的守护进程发送指令以启动 plot-laptop 节点。
  9. 协调器还确保正确设置底层分布式通信层(如 Zenoh),以便数据在承载连接节点的守护进程之间流动( camera-robot 输出 -> object-detector-pc 输入、 camera-robot 输出 -> plot-laptop 输入、 object-detector-pc 输出 -> plot-laptop 输入)。
  10. 协调器继续运行,监视守护进程和数据流的状态,并将状态或日志消息(如果需要)传回 CLI

要停止此分布式数据流,您可以再次使用 CLI,告诉它与哪个协调器对话:

dora stop my_dataflow_id --coordinator-addr 192.168.1.100

CLI 告诉协调器停止数据流,然后协调器向相关守护进程发送停止命令,守护进程又向它们管理的节点发送 STOP 事件。

协调器是如何工作的

Dora 协调器作为独立、专用的后台进程运行。其主要目标是维护跨多台机器的 dora 系统的全局视图。

  • 通信: 它通过 TCP/IPDora 守护进程通信。守护进程启动时会连接到协调器。
  • 状态管理: 它保存正在运行的数据流的整体状态,包括哪些节点属于哪个数据流以及每个节点被分配到哪个守护进程/机器。
  • 事件循环: 与其他 dora 组件一样,Coordinator 也有一个事件循环。它会等待并处理不同的事件:
  • 新的守护进程连接。
  • 来自 Dora CLI 的请求(例如, StartStopListBuildLogsDestroy )。
  • 来自守护进程的消息(例如,“我的守护进程 ID 是......”,“数据流 X 已完成结果......”,“我的机器上的节点 Y 在订阅之前退出”)。
  • 内部事件(如心跳定时器,用于检查守护进程是否仍然活跃)。
  • 编排逻辑: 当它收到数据流的 Start 等命令时:
  • 它解析 YAML 来了解节点部署。
  • 它检查其连接的守护进程列表,以查看所需的机器是否可用。
  • 然后,它将特定的 DaemonCoordinatorEvent 消息(如 Spawn(SpawnDataflowNodes) )发送给相关的守护进程,告诉每个守护进程它负责为这个特定的数据流启动哪些节点。
  • 它跟踪来自守护进程的回复( DaemonCoordinatorReply )以确认节点已成功生成或构建( DataflowSpawnResultDataflowBuildResult )。
  • 一旦所有需要的节点在各自的守护进程上报告准备就绪,协调器就会向所有涉及的守护进程发送 AllNodesReady 事件,节点可以使用该事件(例如开始发送数据)。
  • 当它收到 Stop 命令时,它会向所有涉及的守护进程发送 StopDataflow 事件。

下面是一个简化的序列图,说明如何启动分布式数据流:

coordinator

这表明协调器充当着协调分布式守护进程动作的中心点。

您可以在 dora-coordinator 包 ( binaries/coordinator/src/lib.rs ) 中找到 Dora Coordinator 的核心实现。该文件包含 start 函数,该函数用于设置传入连接(来自守护进程和 CLI 控制平面)的监听器,以及处理不同事件类型( Event::NewDaemonConnectionEvent::ControlEvent::Daemon 等)的主异步循环 ( start_inner )。

协调器与守护进程之间交换的消息(例如 DaemonCoordinatorEventDaemonCoordinatorReply )定义在 libraries/message/src/coordinator_to_daemon.rslibraries/message/src/daemon_to_coordinator.rs 中。CLI 命令定义在 libraries/message/src/cli_to_coordinator.rs 中。协调器的事件循环接收这些消息并触发相应的操作。

总结

Dora Coordinatordora 系统的中央编排器和指挥中心,对于管理分布在多台机器上的数据流至关重要。它接收来自 Dora CLI 的指令,跟踪可用的 Dora Daemons ,并根据数据流 YAML 蓝图协调在正确的机器上启动、停止和管理节点 。通过监督分布式系统,它使您能够无缝构建和管理复杂的多机器应用程序。

至此,我们对 dora 核心概念的入门教程就结束了。我们介绍了数据流构建块(节点、操作符)通信机制(事件流、数据消息/Arrow Data、API 绑定)以及运行时基础架构(命令行界面、守护进程、协调器)。现在,您对 dora 工作原理及其主要组件的作用有了基本的了解,这将帮助您更深入地构建自己的实时数据流应用程序。