操作符

欢迎回来!在数据流中,我们学习了 dora 应用程序的整体蓝图以及它如何连接各个部分。在节点中,我们深入探讨了这些节点部分,并了解到每个节点通常作为一个独立的进程运行,执行特定的任务,并通过数据流 YAML 中定义的输入和输出进行通信。

现在,让我们探索一种在 Node 内部添加更多结构和可重用性的方法: Operator 操作符 的概念。

节点内部更小、可重复使用的步骤

假设你有一个负责处理图像的节点。该节点可能需要执行几个不同的步骤:

  1. 将图像转换为灰度。
  2. 将图像调整为特定尺寸。
  3. 对图像应用过滤器。
  4. 在过滤后的图像中寻找边缘。

你可以在一个节点的单一代码实现中编写所有这些逻辑。但是,如果你之后想在另一个节点或不同的数据流中重用“调整图像大小”或“应用滤镜”逻辑,你就必须复制粘贴代码,这并不理想。

这时, Operator 就派上用场了。Operator 是一种较小的模块化逻辑,设计用于在一种名为 Dora Runtime Node 的特殊节点内运行。Operator 允许你将单个节点的工作分解为可重用的独立步骤。

可以将节点想象成工厂车间的工作站(一栋独立的建筑),而操作符则是该工作站内的专用工具或机器。工作站(节点)是独立的单元,但其内部的工作由其工具集(操作符)完成。

操作符与自定义节点

让我们快速比较一下:

特征自定义节点Dora 运行时节点和操作符
运行方式独立节点单进程
包含一段自定义的代码一个或多个操作符
逻辑单元整个节点代码单个操作符
通讯使用 dora Node API 直接与 dora 系统交互节点运行时使用 dora Node API; 操作符使用 dora Operator API
可重用性重用整个 Node 进程重复使用单个操作符逻辑模块

因此,虽然自定义节点是其自己独立的程序( .py.rs 可执行文件等),但 Dora 运行时 节点是一个运行 dora-runtime 程序的进程,然后加载和管理数据流 YAML 中指定的一个或多个操作员的代码。

在数据流 YAML 中定义操作符

运算符是在 Dataflow YAML 中节点的定义内定义的。您无需为节点的实现指定单个 pathcustom 块,而是使用 operators 符块。

让我们想象一下之前的图像处理节点,现在使用操作符来实现。

dataflow.yml
nodes:
  # ... other nodes like camera ...

  # Define our image processing Node that uses Operators
  - id: image-processor-node # This is the ID of the Node process
    inputs:
      image: camera/image # This Node takes image data from the camera Node
    outputs:
      - processed_image # This Node outputs the final processed image

    # This node is a Runtime Node and will load operators
    operators:
      # First Operator: Convert to Grayscale
      - id: grayscale-converter # ID of the operator within THIS node
        source: python # How to load this operator's code
        path: operators/grayscale_op.py # The code file for this operator
        inputs:
          input_image: image-processor-node/image # Takes input from the NODE's image input
        outputs:
          - gray_image # Outputs grayscale image

      # Second Operator: Resize
      - id: resizer # ID of the second operator
        source: python
        path: operators/resize_op.py
        inputs:
          input_image: grayscale-converter/gray_image # Takes input from the 'gray_image' output of 'grayscale-converter' OPERATOR
        outputs:
          - resized_image # Outputs resized image

      # Third Operator: Apply Filter
      - id: filter
        source: python
        path: operators/filter_op.py
        inputs:
          input_image: resizer/resized_image # Takes input from the 'resized_image' output of 'resizer' OPERATOR
        outputs:
          - filtered_image # Outputs filtered image

      # Fourth Operator: Edge Detector
      - id: edge-detector
        source: python
        path: operators/edge_op.py
        inputs:
          input_image: filter/filtered_image # Takes input from the 'filtered_image' output of 'filter' OPERATOR
        outputs:
          - processed_image # Outputs the final processed image

    # Notice: the 'processed_image' output of the 'edge-detector' operator
    # matches the 'processed_image' output of the 'image-processor-node'.
    # The runtime routes this automatically.

需要注意的关键事项:

  • nodes 下的 id ( image-processor-node ) 是 Node 进程的 ID,和之前一样。
  • 这里有一个 operators 列表,而不是 pathcustom
  • operators 列表中的每个项目定义了在此节点内运行的操作员 。
  • 每个 Operator 都有自己的 id ( 在此节点内唯一)、 source (例如 pythonshared-library )和指向其代码的 path
  • 操作员也有 inputsoutputs
  • 连接语法仍然是 source_id/stream_id
  • 节点自己的 IDimage-processor-node/image ):从外部数据流进入节点的数据被路由到操作员的输入。
  • 同一节点内另一个 OperatorIDgrayscale-converter/gray_image ):数据在同一节点进程内的 Operator 之间直接流动。
  • 如果 Operator 的输出 idNode 的某个 outputs 匹配,则该输出将自动成为该 Node 到外部 Dataflow 的输出( edge-detector/processed_image 变为 image-processor-node/processed_image )。

这允许您在单个 Node 进程中定义一个迷你数据流。

操作员如何工作(操作员 API)

就像 Node 的代码使用 dora Node API 一样,Operator 的代码也使用 dora Operator API 。这个 API 更简单,因为 Operator 不需要担心网络连接或进程管理之类的事情;Runtime Node 会处理这些事情。

操作员的主要工作是对事件做出反应。最常见的事件是接收输入数据。

下面是我们的一个示例运算符(grayscale-converterPython 代码的简化概念图,代码片段非常简短):

operators/grayscale_op.py
# operators/grayscale_op.py

from dora import DoraStatus

class Operator:
    """Converts incoming image to grayscale."""

    def on_event(
        self,
        dora_event, # The event received by the operator
        send_output, # Function to send outputs
    ) -> DoraStatus:
        
        if dora_event["type"] == "INPUT":
            # Check if the input is the one we expect
            if dora_event["id"] == "input_image":
                image_data = dora_event["value"] # Get the incoming image data
                
                print(f"Received image on input_image.")
                
                # --- Operator's specific logic ---
                # (Actual image processing code goes here)
                grayscale_image_data = convert_to_grayscale(image_data) 
                # ---------------------------------

                # Send the result to the operator's output
                send_output("gray_image", grayscale_image_data, dora_event["metadata"]) 

        return DoraStatus.CONTINUE # Keep running

在此 Python 代码片段中:

  • Operator 代码的结构为 Python 类(或 Rust 中的函数)。
  • 它有一个 on_event 方法(或函数),每当发生与该操作符有关的事件(例如接收输入或停止信号)时, dora-runtime 就会调用该方法。
  • dora_event 字典包含有关事件的信息(其 typeidvalue 等。我们将在下一章介绍事件流 )。
  • send_output 函数(或方法)由运行时提供,供 Operator 在其定义的输出上发送数据。它接受输出 iddata 以及可选的 metadata

这个模式( on_event -> process -> send_output )是 Operator 逻辑的核心。

底层:运行时节点和操作符

当您使用 dora run 和定义了带有操作符的运行时节点的 Dataflow YAML 时,下面是发生情况的简化视图:

  1. dora CLIDora Daemon / Dora Coordinator 读取数据流 YAML
  2. 他们将 image-processor-node 标识为一个节点。
  3. 他们看到它有一个 operators 块,表明它是一个运行时节点。
  4. 它们为该节点启动一个单独的进程 。该进程运行 dora-runtime 可执行文件。
  5. dora-runtime 读取 image-processor-node 块中的 Operator 定义。
  6. 它根据 sourcepath 加载 grayscale-converterresizerfilteredge-detector 的代码。
  7. 它建立了沟通渠道:
    • 从外部数据流的 camera/imageimage-processor-node 的 image 输入。
    • 内部,从运行时节点的 image 输入到 grayscale-converter 操作器的 input_image 输入。
    • 在内部,从 grayscale-converter/gray_image 输出到 resizer/input_image 输入。
    • 内部,从 resizer/resized_image 输出到 filter/input_image 输入。
    • 内部,从 filter/filtered_image 输出到 edge-detector/input_image 输入。
    • edge-detector/processed_image 输出到外部 Dataflowimage-processor-node/processed_image 输出。
  8. 然后, dora-runtime 进入事件循环,从外部 Dataflow 接收事件(例如新图片),并根据 YAML 中定义的内部连接,将它们分发给相关的 Operator。当一个 Operator 生成输出时,运行时会将其内部路由到另一个 Operator,或者将其作为 Node 输出发送出去。

这是一个简化的序列图:

序列图

这显示了数据如何流入单个运行时节点进程,通过运行时在操作员之间内部传递,然后作为节点输出流出。

总结

在本章中,我们了解到 Operator 是运行在 Dora Runtime 节点进程中的模块化逻辑。它们在 Dataflow YAMLNode 块中定义,允许您在单个节点内构建复杂的处理流水线。Operator 使用 Operator API 接收输入并发送输出, dora-runtime 负责加载它们、在它们之间建立内部通信,以及在节点的外部输入/输出和 Operator 的输入/输出之间路由数据。这提供了一种创建可复用处理步骤的方法。

现在我们了解了节点和操作符,让我们看看数据在系统中移动的基本概念: Event Stream 事件流