事件流处理

欢迎回到 dora 教程!在节点中,我们了解到节点是数据流中独立的工作单元。在操作符中,我们了解了操作符如何在一种特殊的节点内部构建工作。

节点操作符都需要一种方法来感知需要关注的事件发生——例如新数据到达或停止命令。它们是如何接收这些重要通知的?

这就是 Event Stream 事件流 发挥作用的地方。

节点(或操作符)的收件箱

想象一下,一个节点或操作符就像一个坐在办公桌前准备处理任务的微型办公室职员。他们不会随意开始工作;他们需要被告知新任务何时到来以及任务是什么 。

事件流就像是节点或操作符的收件箱或通知源 。它是一个连续的消息流,告知工蜂完成工作所需的一切信息。

无论您的代码是作为独立节点进程运行还是作为运行时节点内的操作符运行,它都会通过其专用事件流接收信息。

事件流中包含哪些类型的消息?

事件流不仅仅用于传入数据。它包含不同类型的消息, dora 称之为 “事件” 。这些事件会告诉您的节点或操作符数据流和系统中正在发生的事情。

以下是您通常会在信息流中发现的主要事件类型:

  • INPUT: 这是最常见的事件。它表示新数据已到达节点(或算子)定义的某个输入。该事件将包含数据本身,并告知您数据来自哪个输入。这就是节点和算子接收需要处理的数据的方式。
  • InputClosed: 此事件表示节点(或算子)的某个输入源已关闭或断开连接。如果正在为此输入生成数据的节点/算子完成其工作或崩溃,则可能会发生这种情况。您的代码可能需要对此做出反应,例如在关键输入丢失时优雅地关闭。
  • STOP: 此事件是来自 dora 运行时的命令,告知 NodeOperator 关闭。当您使用 dora CLI 停止数据流,或者数据流自然结束时(例如,所有输入都关闭,没有其他工作要做)时,就会发生这种情况。您的代码应该监听此事件并干净地退出。
  • RELOAD: 此事件表示节点或操作符应重新加载其配置或逻辑(如果支持热重载)。这是一项高级功能,允许在不完全重启的情况下更改数据流。
  • ERROR: 表示事件流本身发生了不可恢复的错误。

节点和操作符如何监听流

您的节点或操作符代码使用您选择的语言的 dora API 绑定与事件流进行交互(我们将在第 6 章:API 绑定中介绍 API)。

典型的模式是进入一个循环,反复向事件流询问“下一个”事件。然后,代码检查事件的类型及其详细信息,以决定采取什么操作。

下面是一个简化的 Python 示例,展示了节点如何监听其事件流:

from dora import Node

# Initialize the node - connects to the dora runtime
node = Node() 
print("Node initialized. Waiting for events...")

# Enter the loop to process events from the stream
for event in node: 
    event_type = event["type"]
    event_id = event["id"] # Often the input ID

    print(f"Received event: Type={event_type}, ID={event_id}")

    if event_type == "INPUT":
        # Handle incoming data
        print(f"  -> Data received on input '{event_id}'")
        data = event["value"] 
        # Now you would process the 'data'...
        
        # Example: If it's from an input named 'camera_image'
        if event_id == "camera_image":
            print("  -> Processing camera image...")
            # process_image(data)
            # node.send_output(...) # Send results if needed
            pass # Placeholder for actual processing

    elif event_type == "STOP":
        # Handle stop command
        print("  -> Received STOP command. Exiting loop.")
        break # Exit the main loop

    elif event_type == "InputClosed":
        # Handle input closing
        print(f"  -> Input '{event_id}' closed.")
        # Decide if the node should continue or stop

    elif event_type == "RELOAD":
        # Handle reload command
        print("  -> Received RELOAD command.")
        # Reload configuration if implemented

    elif event_type == "ERROR":
        # Handle a stream error
        print(f"  -> Received ERROR: {event['error']}")
        # Log or react to the error

print("Node stopping gracefully.")

这个简单的循环是许多 dora NodeOperator 实现的核心。dora APIfor event in node: 处理了等待流中下一个事件的复杂任务,并将其呈现为一个简单的类似字典的对象( eventdora 以便您的代码可以轻松理解和响应。

Operator 响应事件的代码结构非常相似,但它使用 Operator APIon_event 函数,而不是直接循环遍历 Node 对象(如操作符 中所示)。原理相同:接收事件,检查其类型和 ID,并做出相应的响应。

底层:Stream 的工作原理

dora 运行时Dora DaemonDora Coordinator )基于您的 Dataflow YAML 启动 Node 进程时,它还会设置形成该特定 Node 的事件流的通信通道。

回想一下,数据流就像管道图一样。当你在 YAML 中将 camera/image 连接到 object-detection/image 时,运行时并不会神奇地传输数据。它会在 camera 节点的输出机制和 object-detection 节点的事件流输入机制之间建立专用通道。

camera 头节点生成 image 输出时, dora 运行时会获取该数据,并将 INPUT 事件放置到事件流中 ,专门用于 object-detection 节点 。 object-detection 节点的代码在其 for event in node 循环中等待,然后接收此 INPUT 事件,从中获取图像数据,并开始处理。

STOPRELOAD 命令的工作原理类似。当您从 CLI 停止数据流时,运行时会向所有正在运行的节点和算子的事件流发送 STOP 事件。

以下是此交互的简化视图:

event-stream

该图显示,事件流充当中介,由 dora 运行时管理,将各种类型的事件传递给正在主动监听它们的 NodeOperator 进程。dora 优化该流的内部机制(通常使用共享内存来存储数据,我们将会看到),以提高性能。

总结

事件流dora 中的一个基本概念。它是节点和操作符接收其运行所需的所有信息的重要通信渠道,包括传入数据 ( INPUT )、输入源关闭信号 ( InputClosed ) 以及系统命令( STOPRELOAD )。通过监听此事件流,您的节点和操作符代码可以动态地响应数据流的状态。dora 运行管理这些事件流,并根据数据流蓝图将事件传递给相应的节点和操作符。

现在您已经了解了节点和操作符如何接收通知,让我们仔细看看 INPUT 事件中经常包含的 DataMessage/Arrow Data 数据 本身 。