Dora 数据流节点终极指南

引言:欢迎来到 Dora 的自动化工厂

欢迎您,未来的数据流架构师!请将 dora 想象成一个高度智能的自动化工厂,而您,就是这家工厂的总设计师。您的核心任务是设计一条或多条高效的流水线 (Dataflow),用于处理、转换和传输数据。

在这条流水线上,每一个独立的工作站,我们称之为 节点 (Node)

什么是节点 (Node)?

简单来说,一个节点就是一个独立的工人。它具备以下特征:

  • 身份:拥有唯一的工牌号 (id) 和供人阅读的职位描述
  • 技能:掌握一项特定的技能 (pathoperator),即它要执行的程序逻辑。
  • 协作:能从上游工位接收零件 (inputs),并在完成自己的工作后,将新产品放到传送带上,交给下游工位 (outputs)。

您的全部设计工作,都将体现在一份名为 dataflow.yml工厂总蓝图中。这份文档将是您学习如何绘制这份蓝图的终极指南。

根据蓝图的设计,工人主要有两种工作模式:

  1. 独立工人 (Standalone Process Node): 单独在一个工位上工作,拥有独立的进程空间。
  2. 工人团队 (Runtime Node): 一位工位管理员带领着一队操作符 (Operators) 在一个大的共享工位里高效协作。

现在,让我们深入每一个细节,解构节点的全部属性。

节点属性深度解析

我们将节点的所有属性划分为几个逻辑部分,并对每个属性进行“通俗讲解”和“参数详解”两个层面的分析。

第一部分:身份与元数据 (Identity & Metadata)

这些属性定义了工人的基本信息。

id

  • 通俗讲解 📛 每个工人都必须有一个独一无二的“工牌号”,就像我们的身份证号码一样。这是工厂调度系统识别和找到他的唯一凭证。
    id: camera_source_node
  • 参数详解
    • 类型: NodeId (String)
    • 含义: 节点的唯一标识符,是其在数据流拓扑中的主键。
    • 限制条件: 必需字段;在整个数据流中必须唯一;禁止包含正斜杠 (/) 字符。
    • 配合参数: 被其他节点的 inputs 字段引用,用于建立数据流连接。

name & description

  • 通俗讲解 💬 除了工牌号,我们还可以给工人起一个好听的“花名” (name),并写一份详细的“职位描述” (description)。这不会改变工人的技能,但能让工厂蓝图更容易被人类读懂。
    name: "高清摄像头画面采集器"
    description: "负责从 RealSense D435 深度相机捕获 1080p 的视频流。"
  • 参数详解
    • 类型: Option<String>
    • 含义: name 是简短的可读名称,description 是详细的功能说明。主要用于增强文档、日志和监控的可读性。
    • 限制条件: 均为可选字段,不要求唯一性。

第二部分:执行与行为 (Execution & Behavior)

这些属性决定了工人如何工作,以及他具体执行什么任务。

path

  • 通俗讲解 📜 这是“独立工人”的“技能手册”。它直接告诉工人应该运行哪个程序或脚本。如果一个节点有 path,它就是一个独立工作的工人。
    path: nodes/video_processing.py
  • 参数详解
    • 类型: Option<String>
    • 含义: 定义独立进程节点的可执行文件或脚本的路径。
    • 限制条件: 与 operatorsoperator 字段互斥
    • 配合参数: args, env, build。若与 git 配合,此路径通常相对于 Git 仓库的根目录。

args

  • 通俗讲解 🗣️ 给工人下达的“临时指令”。就像你告诉他:“嘿,今天的工作,请按照'加急模式'来办!”
    args: "--mode fast --quality high"
  • 参数详解
    • 类型: Option<String>
    • 含义: 传递给 path 指定的可执行文件的命令行参数字符串。
    • 限制条件: 仅对定义了 path 的独立进程节点有效。

env

  • 通俗讲解 📋 工厂里的“共享公告板”。你可以在上面写一些所有工人都需要知道的信息,比如“今天的生产目标:1000件”或“紧急出口在此方向”。
    env:
      API_KEY: "a_very_secret_key"
      MAX_RETRIES: 5
  • 参数详解
    • 类型: Option<BTreeMap<String, EnvValue>>
    • 含义: 为节点的构建和执行环境设置环境变量。
    • 限制条件: EnvValue 支持字符串、布尔值和数字。是传递敏感信息(如密钥)的最佳方式。

operators & operator

  • 通俗讲解 👨‍👩‍👧‍👦 这是“工人团队”模式的核心。一个节点如果没有 path,但有 operatorsoperator,那它就不是一个干活的工人,而是一个“工位管理员”(运行时节点)。他负责管理一整个操作符 (Operator) 团队在同一个大工位里高效协作。operatoroperators 只有一个成员时的简化写法。
    # 一个管理员,带领两个操作符
    operators:
      - id: detector
        python: detect.py
      - id: tracker
        python: track.py
  • 参数详解
    • 类型: Option<RuntimeNode> / Option<SingleOperatorDefinition>
    • 含义: 定义在运行时节点内部署的一个或多个轻量级操作符。它的存在标志着该节点是一个操作符宿主。
    • 限制条件: 与 path 字段互斥operatorsoperator 自身也互斥

第三部分:数据流连接 (Dataflow Connectivity)

这些属性定义了传送带,让零件在工人之间流动。

outputs

  • 通俗讲解 📤 工人在这里“晒出”他能生产的所有产品类型清单。比如:“我能生产'切割好的钢板'和'废料'两种东西。”
    outputs:
      - processed_video
      - detected_objects
  • 参数详解
    • 类型: BTreeSet<DataId> (Set of Strings)
    • 含义: 声明该节点可以产生的所有数据输出流的唯一标识符 (ID)。
    • 限制条件: 必需,但可为空 []。下游节点的 inputs 必须引用此列表中声明的 ID。

inputs

  • 通俗讲解 📥 工人在这里“下订单”,说明他需要哪些上游工人的哪些产品来完成自己的工作。这是连接流水线的关键。
    # 我需要一个叫 `raw_video` 的零件,它来自 `camera_node` 工人生产的 `video_stream`
    inputs:
      raw_video: camera_source_node/video_stream
  • 参数详解
    • 类型: BTreeMap<DataId, Input> (Map)
    • 含义: 定义节点的输入订阅。将一个本地输入 ID 映射到一个上游输出。
    • 语法: local_input_id: upstream_node_id/upstream_output_id
    • 限制条件: 引用的上游节点 ID 和输出 ID 必须在数据流中真实存在。

send_stdout_as

  • 通俗讲解 🎤 给工人安装一个“工作日志麦克风”。它能捕获工人在工作时的所有“碎碎念”(程序打印的日志),并把这些“声音”也当作一种产品放到传送带上,供其他工人(如质检员)分析。
    outputs:
      - main_product
      - work_logs # 别忘了在 outputs 里也声明这个日志产品
    send_stdout_as: work_logs
  • 参数详解
    • 类型: Option<String>
    • 含义: 将节点的标准输出 (stdout) 和标准错误 (stderr) 流重定向到一个具名的数据流输出通道。
    • 限制条件: 此处指定的名称必须同时在 outputs 列表中声明。
    • 配合参数: outputs

第四部分:源码管理与构建 (Source Management & Build)

这些属性让你的工厂实现了“云采购”和“岗前自动化培训”。

git, branch, tag, rev

  • 通俗讲解 📚 与其把技能手册 (path) 手动放到每个工位,不如让工人直接从中央“技能图书馆” (git) 下载。你可以指定下载最新的“草稿” (branch),某个“发行版” (tag),或者精确到某一页的“修订版” (rev)。
    git: https://github.com/dora-rs/dora.git
    tag: v0.4.0 # 推荐使用 tag 或 rev,保证版本稳定
  • 参数详解
    • 类型: Option<String>
    • 含义: git 指定仓库 URL;branch, tag, rev 用于版本控制,分别对应分支、标签和提交哈希。
    • 限制条件: branch, tag, rev 互斥,且必须与 git 配合使用。

build

  • 通俗讲解 🛠️ “岗前培训”。有些技能手册是“散装的零件”(源代码),需要先“组装”(编译)成可用的工具。build 就是这个组装指令。
    build: cargo build --release
  • 参数详解
    • 类型: Option<String>
    • 含义: 在 dora build 阶段执行的构建命令。
    • 限制条件: env 中定义的环境变量对此命令生效。

源码分析

dora 的核心库中详细定义了 Node 节点的参数。

文件 libraries/message/src/descriptor.rs115行

descriptor.rs
/// # Dora Node Configuration
///
/// A node represents a computational unit in a Dora dataflow. Each node runs as a
/// separate process and can communicate with other nodes through inputs and outputs.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Node {
    pub id: NodeId,
    pub name: Option<String>,
    pub description: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub path: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub args: Option<String>,
    pub env: Option<BTreeMap<String, EnvValue>>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub operators: Option<RuntimeNode>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub operator: Option<SingleOperatorDefinition>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub custom: Option<CustomNode>,
    #[serde(default)]
    pub outputs: BTreeSet<DataId>,
    #[serde(default)]
    pub inputs: BTreeMap<DataId, Input>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub send_stdout_as: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub build: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub git: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub branch: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tag: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub rev: Option<String>,
    #[schemars(skip)]
    #[serde(rename = "_unstable_deploy")]
    pub deploy: Option<Deploy>,
}

详细字段分析

核心身份信息 (Identity)

id: NodeId

  • 作用: 必需的唯一标识符。这是节点在数据流图中的“身份证”,所有节点的 id 必须唯一,且不能包含 / 字符。inputs 字段用它来路由数据。

name: Option<String>

  • 作用: 可选的、人类可读的名称。主要用于文档和日志,以提高可读性。

description: Option<String>

  • 作用: 可选的、对节点功能的详细文字描述。进一步增强了数据流图的可读性和可维护性。

独立进程节点配置 (Standalone Node Configuration)

path: Option<String>

  • 作用: 定义独立进程节点的关键。指向一个可执行文件(如 Rust 编译产物)或脚本(如 Python 脚本)的路径。dora 会运行这个文件。如果一个 Node 定义了 path,它就是一个独立进程节点。

args: Option<String>

  • 作用: 传递给 path 指定的可执行文件的命令行参数。

env: Option<BTreeMap<String, EnvValue>>

  • 作用: 为节点进程(包括构建和运行阶段)设置环境变量。

运行时与操作符配置 (Runtime & Operator Configuration)

operators: Option<RuntimeNode>

  • 作用: 定义运行时节点的关键。包含一个或多个操作符(Operator)的定义。如果一个 Node 定义了 operators 字段(而没有 path 字段),它就成为一个运行时节点。

operator: Option<SingleOperatorDefinition>

  • 作用: operators 字段的便利版。当一个运行时节点只包含一个操作符时,可以使用此字段来简化 YAML 配置。它和 operators 是互斥的。

数据流连接 (Dataflow Connectivity)

outputs: BTreeSet<DataId>

  • 作用: 声明该节点产生的所有输出流的 ID。这是一个集合,列出了所有该节点可能发送数据的“通道名称”。下游节点通过这些 ID 来订阅数据。

inputs: BTreeMap<DataId, Input>

  • 作用: 定义该节点的输入流连接。这是一个映射表,key 是本节点内部使用的输入名称 (input_id),value (Input) 则指明了该输入来源于哪个上游节点 (source_node_id) 的哪个输出 (source_node_output_id)。这是构建数据流图(DAG)连接关系的核心。

send_stdout_as: Option<String>

  • 作用: 一个非常实用的功能,可以将节点进程的标准输出(stdout)和标准错误(stderr)重定向到一个 dora 的数据输出流中。这使得日志和调试信息可以像普通数据一样在数据流中传递。

构建与源码管理 (Build & Source Management)

build: Option<String>

  • 作用: 定义在 dora build 期间需要执行的构建命令。例如 cargo buildpip install。这使得 dora 可以直接从源代码构建节点。

git: Option<String>

  • 作用: 指定一个 Git 仓库地址。dora build 会自动克隆这个仓库,使得数据流可以轻松分发和复现。

branch: Option<String>, tag: Option<String>, rev: Option<String>

  • 作用: 与 git 字段配合使用,用于检出(checkout)指定的 Git 分支、标签或提交版本。这确保了构建的一致性。

其他字段

custom: Option<CustomNode>

  • 作用: 已弃用 (deprecated) 的旧版字段。源码注释明确指出应使用顶层的 path, args 等字段。

deploy: Option<Deploy>

  • 作用: 用于机器部署的非稳定(unstable)配置。从 #[schemars(skip)] 属性看,它不包含在公开的配置 schema 中,属于内部或实验性功能。

技术方案总结

这份最新的 Node 定义展示了一个成熟且高度灵活的设计方案:

  1. 声明式与配置驱动: 整个数据流图完全通过 YAML 文件进行声明式定义。Node 结构体是这个声明的核心,它是一个纯粹的数据容器,可以被 serde 完美地解析。
  2. 隐式模式切换: 通过检查 pathoperators/operator 字段的存在与否,dora 能够智能地推断出节点是应作为独立进程运行,还是作为轻量级操作符的宿主运行。这种设计使得 YAML 语法非常直观和扁平。
  3. 构建与源码集成: 通过 gitbuild 字段,dora 将节点的源码管理和构建过程无缝集成到其工作流中,极大地提升了项目的可移植性和可复现性。
  4. 清晰的关注点分离:
    • 身份 (who): id, name, description
    • 行为 (what): pathoperators
    • 连接 (how): inputs, outputs
    • 准备 (prepare): build, git

最佳实践与最终建议

现在你已经是 dora 节点配置专家了!一个全功能的节点定义可能看起来像下面这样,但别怕,你已经理解了每一部分!

dataflow.yml
# 这是一个 dora 数据流的综合示例文件 (dataflow.yml)
# 它展示了如何使用 Node 的各种属性来定义一个复杂的多节点应用。

nodes:
  # --------------------------------------------------------------------------
  # 节点 1: 独立进程节点 (Standalone Rust Node) - 从 Git 构建
  # 职责: 模拟从摄像头捕获视频帧。
  # 特点: 使用 git, tag, build, env, 和 send_stdout_as 字段。
  # --------------------------------------------------------------------------
  - id: video_capture
    name: "Camera Capture Node (Rust)"
    description: "从 Git 仓库构建的 Rust 节点,模拟捕获视频帧并发送。同时,它的标准输出会被重定向为 'log_stream' 输出。"

    # 源码管理: 指定从哪个 Git 仓库获取源代码
    git: https://github.com/dora-rs/dora.git
    # 版本控制: 锁定到 v0.4.0 标签,确保构建的可复现性
    tag: v0.4.0

    # 构建命令: 在 dora build 期间执行的命令
    build: cargo build --release -p camera-source

    # 环境变量: 为构建和运行环境设置环境变量
    env:
      RUST_LOG: info
      CAMERA_ID: 0

    # 运行配置: 构建完成后,要执行的程序路径
    # 注意:路径是相对于 git 仓库的根目录
    path: target/release/camera-source

    # 输出: 声明此节点会产生两个输出流
    outputs:
      - video_frame # 主要的数据输出
      - log_stream  # 用于接收标准输出的流

    # 特殊功能: 将此节点的 stdout 和 stderr 重定向到名为 'log_stream' 的输出流
    send_stdout_as: log_stream

  # --------------------------------------------------------------------------
  # 节点 2: 独立进程节点 (Standalone Python Node)
  # 职责: 调整接收到的视频帧尺寸。
  # 特点: 使用 args, inputs, 和 branch 字段 (演示与 tag 的不同)。
  # --------------------------------------------------------------------------
  - id: frame_resizer
    name: "Image Resizer (Python)"
    description: "一个 Python 节点,用于调整图像大小。从开发分支获取代码。"

    # 使用同一个 git 仓库,但切换到主开发分支
    git: https://github.com/dora-rs/dora.git
    branch: main

    # 运行配置: 直接指定 Python 脚本路径和命令行参数
    path: examples/python-dataflow/nodes/resize.py
    args: "--width 640 --height 480"

    # 输入: 定义数据来源
    # 'image' 输入流 -> 连接到 'video_capture' 节点的 'video_frame' 输出
    inputs:
      image: video_capture/video_frame

    # 输出: 声明会产生一个名为 'resized_image' 的输出
    outputs:
      - resized_image

  # --------------------------------------------------------------------------
  # 节点 3: 运行时节点 (Runtime Node) - 托管多个操作符
  # 职责: 作为宿主进程,运行两个轻量级操作符。
  # 特点: 使用 operators 字段,没有 path 字段。
  # --------------------------------------------------------------------------
  - id: processing_hub
    name: "AI & Data Processing Hub"
    description: "这是一个运行时节点,它本身不执行逻辑,而是为其内部的操作符提供运行环境。"

    # 注意:没有 path, args, build 等字段,因为它不是一个独立的进程节点。
    
    # 操作符定义:
    operators:
      # 运行时内的第一个操作符
      - id: yolo_detector
        name: "YOLOv8 Detection Operator"
        description: "对输入的图片运行目标检测算法。"
        # 操作符的实现代码
        python: path/to/yolo_operator.py
        # 操作符的输入
        inputs:
          image: frame_resizer/resized_image
        # 操作符的输出
        outputs:
          - detections # 检测结果
          - metadata   # 附加元数据

      # 运行时内的第二个操作符 (设想中的 Rust 操作符)
      - id: aggregator
        name: "Data Aggregator Operator"
        description: "聚合来自多个源的数据。"
        # 设想中的 Rust 共享库操作符
        shared_library: path/to/libaggregator.so
        # 操作符的输入
        inputs:
          # 可以订阅来自其他操作符或其他节点的数据
          yolo_meta: yolo_detector/metadata
          camera_log: video_capture/log_stream
        # 操作符的输出
        outputs:
          - summary_report

  # --------------------------------------------------------------------------
  # 节点 4: 最终的日志和存储节点 (Sink Node)
  # 职责: 收集所有最终结果并打印日志。
  # 特点: 拥有多个复杂的输入,是数据流的终点。
  # --------------------------------------------------------------------------
  - id: sink_logger
    name: "Final Logger"
    description: "接收所有处理结果并将其打印到控制台。"
    path: examples/python-dataflow/nodes/logger.py
    
    # 输入: 演示如何订阅来自不同节点和操作符的多个数据流
    inputs:
      detections_input: processing_hub/yolo_detector/detections
      report_input: processing_hub/aggregator/summary_report
      raw_logs_input: video_capture/log_stream

    # 没有 outputs 字段,因为它是数据流的终点(Sink)。
  1. ID 清晰唯一: 这是数据流的基石,请认真命名。
  2. 文档即代码: 善用 namedescription,让你的数据流蓝图自己会说话。
  3. 拥抱版本控制: 在生产环境中,永远使用 git + tagrev 的组合,杜绝不确定性。
  4. 配置外部化: 尽量使用 envargs 传入配置,保持代码的通用性。
  5. 调试利器: 对于关键或复杂的节点,毫不犹豫地使用 send_stdout_as,它会在未来为你节省大量调试时间。
  6. 合理选择模式: 从简单的独立工人模式开始。当遇到性能瓶颈或多个节点需要紧密、低延迟通信时,再重构为工人团队(运行时 + 操作符)模式。

现在,您已经拥有了设计 dora 节点所需的所有知识,从高层概念到技术细节。是时候打开您的 dataflow.yml,开始构建属于您自己的、高效、智能的自动化数据工厂了!