查看源代码 如何为 Erlang 分布实现备用载体

本节介绍如何为 Erlang 分布实现备用载体协议。分布通常由 TCP/IP 承载。这里解释了一种用另一种协议替换 TCP/IP 的方法。

本节逐步解释了 uds_dist 示例应用程序(在 Kernel 应用程序 examples 目录中)。uds_dist 应用程序实现了通过 Unix 域套接字的分布,并且是为 Sun Solaris 2 操作系统环境编写的。然而,其机制是通用的,适用于 Erlang 运行的任何操作系统。C 代码没有做成可移植的原因仅仅是为了可读性。

简介

要为 Erlang 分布实现新的载体,主要步骤如下。

注意

从 ERTS 版本 10.0 开始,引入了对分布控制器进程的支持。也就是说,分布通道上的流量可以由进程而不是仅由端口管理。这使得可以在 Erlang 代码中实现大部分逻辑,您甚至可能不需要为该协议编写新的驱动程序。一个例子是使用 gen_udp 通过 UDP 进行 Erlang 分布(当然,在这种情况下,您的 Erlang 代码将必须处理重传等)。也就是说,根据您想做什么,您可能根本不需要实现驱动程序,然后可以跳过下面的驱动程序相关部分。分布模块部分中描述的 gen_tcp_disterl_uds_dist 示例使用了分布控制器进程,如果您想使用分布控制器进程,则值得一看。

编写 Erlang 驱动程序

首先,协议必须对 Erlang 机器可用,这涉及到编写 Erlang 驱动程序。不能使用端口程序,需要 Erlang 驱动程序。Erlang 驱动程序可以是

  • 静态链接到仿真器,当使用 Erlang 的开源发行版时,这可能是一种选择,或者
  • 动态加载到 Erlang 机器的地址空间中,如果要使用 Erlang 的预编译版本,这是唯一的选择

编写 Erlang 驱动程序并不容易。驱动程序被编写为一些回调函数,当数据发送到驱动程序或驱动程序在文件描述符上有任何可用数据时,由 Erlang 仿真器调用。由于驱动程序回调例程在 Erlang 机器的主线程中执行,因此回调函数不能执行任何阻塞活动。回调仅用于设置文件描述符以等待和/或读取/写入可用数据。所有 I/O 都必须是非阻塞的。但是,驱动程序回调是按顺序执行的,因此可以在例程中安全地更新全局状态。

为驱动程序编写 Erlang 接口

当实现驱动程序后,最好为驱动程序编写一个 Erlang 接口,以便能够单独测试驱动程序的功能。然后,该接口可以被分布模块使用,这将涵盖 net_kernel 中的协议细节。

最简单的方法是模仿 inetinet_tcp 接口,但那些模块中不需要实现太多功能。在示例应用程序中,只实现了少数常用接口,并且它们都进行了简化。

编写分布模块

当可以通过驱动程序和 Erlang 接口模块使协议对 Erlang 可用时,就可以编写分布模块。分布模块是一个具有定义良好的回调的模块,很像 gen_server(尽管没有编译器支持检查回调)。此模块实现

  • 查找其他节点的详细信息(即,与 epmd 或类似的东西通信)
  • 创建监听端口(或类似的东西)
  • 连接到其他节点
  • 执行握手/cookie 验证

但是,有一个实用模块 dist_util,它可以完成处理握手、cookie、计时器和滴答的大部分繁重工作。使用 dist_util 可以更容易地实现分布模块,这在示例应用程序中已经完成。

创建启动脚本

最后一步是创建启动脚本,以便在启动时使协议实现可用。可以通过在所有系统都运行时启动分布来调试实现,但在实际系统中,分布应该很早就启动,因此需要一个启动脚本和一些命令行参数。

此步骤还意味着接口和分布模块中的 Erlang 代码的编写方式应使其可以在启动阶段运行。特别是,不能调用 application 模块或任何未在启动时加载的模块。也就是说,只能使用 KernelSTDLIB 和应用程序本身。

分布模块

分布模块公开了一个 net_kernel 调用的 API,以便管理与其他节点的连接。模块名称应带有后缀 _dist

该模块需要创建某种监听实体(进程或端口)和一个接受进程,该进程使用监听实体接受传入的连接。对于每个连接,该模块至少需要创建一个连接 supervisor 进程,该进程还负责在建立连接时进行握手,以及一个负责通过连接传输数据的分布控制器(进程或端口)。分布控制器和连接 supervisor 进程应链接在一起,以便在连接被断开时将它们全部清理干净。

请注意,每个连接都需要恰好一个分布控制器。一个进程或端口只能作为一个连接的分布控制器。作为分布控制器的注册不能撤消。它将保持不变,直到分布控制器终止。分布控制器不应忽略退出信号。允许捕获退出,但收到退出信号时应自愿终止。

可以在 $ERL_TOP/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl 中找到分布模块的示例实现。它使用 gen_tcp API 通过 TCP/IP 实现分布,分布控制器由进程实现。这与使用端口分布控制器作为普通 TCP/IP 分布的方式不同。

可以在 $ERL_TOP/lib/kernel/examples/erl_uds_dist/src/erl_uds_dist.erl 中找到分布模块的另一个示例实现。它使用 gen_tcp API 通过 Unix 域套接字实现分布,分布控制器由进程实现。与使用 C 语言编写的端口驱动程序的原始 uds_dist 示例相比,erl_uds_dist 完全用 Erlang 编写。

导出的回调函数

以下函数是强制性的

  • listen(Name) ->
      {ok, {Listen, Address, Creation}} | {error, Error}
    listen(Name,Host) ->
      {ok, {Listen, Address, Creation}} | {error, Error}

    listen/2 在分布启动时被调用一次,以监听传入的连接请求。参数 Name 是完整节点名称中 @ 符号之前的部分。它可以是原子或字符串。参数 Host 是完整节点名称中 @ 符号之后的部分。它始终是一个字符串。

    返回值由 Listen 句柄(稍后传递给 accept/1 回调)、Address(这是一个包含节点地址信息的 #net_address{} 记录)和 Creation(当前)是一个整数 123 组成。(#net_address{} 记录在 kernel/include/net_address.hrl 中定义)。

    如果要将 epmd 用于节点发现,您通常希望使用 erl_epmd 模块(kernel 应用程序的一部分)来向 epmd 注册监听端口并检索要使用的 Creation

  • address() ->
      Address

    address/0 被调用以获取 listen/2 函数的 Address 部分,而无需创建监听套接字。除了 address 之外的所有字段都必须在返回的记录中设置

    示例

    address() ->
        {ok, Host} = inet:gethostname(),
        #net_address{ host = Host, protocol = tcp, family = inet6 }.
  • accept(Listen) ->
      AcceptorPid

    accept/1 应生成一个接受连接的进程。此进程最好以 max 优先级执行。应返回此进程的进程标识符。

    Listen 参数将与上面的 listen/1 回调的返回值中的 Listen 句柄部分相同。accept/1 仅在启动分布协议时调用一次。

    此函数的调用者是 net_kernel 的代表(这可能也可能不是注册为 net_kernel 的进程),并在本文档中标识为 Kernel。当 acceptor 进程接受连接后,它需要将接受的连接通知 Kernel。这是通过传递以下形式的消息来完成的

    Kernel ! {accept, AcceptorPid, DistController, Family, Proto}

    DistController 是连接的分布控制器的进程或端口标识符。分布控制器应在接受新连接时由 acceptor 进程创建。它的工作是分派连接上的流量。

    Kernel 返回以下消息之一

    • {Kernel, controller, SupervisorPid} - 请求已被接受,并且 SupervisorPid 是连接 supervisor 进程的进程标识符(该进程是在 accept_connection/5 回调中创建的)。

    • {Kernel, unsupported_protocol} - 请求被拒绝。这是一个致命错误。 acceptor 进程应该终止。

    当接受序列完成后,应该期望 acceptor 进程继续接受进一步的请求。

  • accept_connection(AcceptorPid, DistCtrl, MyNode, Allowed, SetupTime) ->
      ConnectionSupervisorPid

    accept_connection/5 应该生成一个进程,该进程将执行连接的 Erlang 分布握手。如果握手成功完成,它应该继续作为连接 supervisor 运行。该进程最好以 max 优先级执行,并且应该链接到调用者。可以调用 dist_util:net_ticker_spawn_options() 函数来获取适合此进程的 spawn 选项,这些选项可以直接传递给 erlang:spawn_opt/4dist_util:net_ticker_spawn_options() 默认将返回 [link, {priority, max}],但是允许用户使用 net_ticker_spawn_options 内核参数配置更多选项。应该返回此进程的进程标识符。

    参数

    • AcceptorPid - 由 accept/1 回调创建的进程的进程标识符。

    • DistCtrl - 由 acceptor 进程创建的分布控制器标识符。要传递给 dist_util:handshake_other_started(HsData)

    • MyNode - 此节点的节点名称。要传递给 dist_util:handshake_other_started(HsData)

    • Allowed - 要传递给 dist_util:handshake_other_started(HsData)

    • SetupTime - 通过调用 dist_util:start_timer(SetupTime) 创建设置定时器所用的时间。该定时器应传递给 dist_util:handshake_other_started(HsData)

    创建的进程应在 #hs_data{} 记录中提供握手所需的回调和其他信息,并使用此记录调用 dist_util:handshake_other_started(HsData)

    dist_util:handshake_other_started(HsData) 将执行握手,如果握手成功完成,则此进程将继续在连接 supervisor 循环中运行,只要连接处于活动状态。

  • setup(Node, Type, MyNode, LongOrShortNames, SetupTime) ->
      ConnectionSupervisorPid

    setup/5 应该生成一个连接到 Node 的进程。建立连接后,它应该执行连接的 Erlang 分布握手。如果握手成功完成,它应该继续作为连接 supervisor 运行。该进程最好以 max 优先级执行,并且应该链接到调用者。可以调用 dist_util:net_ticker_spawn_options() 函数来获取适合此进程的 spawn 选项,这些选项可以直接传递给 erlang:spawn_opt/4dist_util:net_ticker_spawn_options() 默认将返回 [link, {priority, max}],但是允许用户使用 net_ticker_spawn_options 内核参数配置更多选项。应该返回此进程的进程标识符。

    参数

    • Node - 远程节点的节点名称。要传递给 dist_util:handshake_we_started(HsData)

    • Type - 连接类型。要传递给 dist_util:handshake_we_started(HsData)

    • MyNode - 此节点的节点名称。要传递给 dist_util:handshake_we_started(HsData)

    • LongOrShortNames - 原子 longnames 或原子 shortnames,表示使用长名称还是短名称。

    • SetupTime - 通过调用 dist_util:start_timer(SetupTime) 创建设置定时器所用的时间。该定时器应传递给 dist_util:handshake_we_started(HsData)

    此函数的调用者是 net_kernel 的代表(这可能是或可能不是注册为 net_kernel 的进程),并且在本文档中标识为 Kernel

    此函数除了生成连接 supervisor 外,还应该创建一个分布控制器。分布控制器是负责调度流量的进程或端口。

    创建的进程应在 #hs_data{} 记录中提供握手所需的回调和其他信息,并使用此记录调用 dist_util:handshake_we_started(HsData)

    dist_util:handshake_we_started(HsData) 将执行握手,如果握手成功完成,则此进程将继续在连接 supervisor 循环中运行,只要连接处于活动状态。

  • close(Listen) ->
      void()

    为了关闭最初从 listen/1 回调传递的 Listen 句柄而调用。

  • select(NodeName) ->
      boolean()

    如果 NodeName 的主机名部分对于此协议有效,则返回 true;否则,返回 false

还有两个可选的函数可以导出

  • setopts(Listen, Opts) ->
      ok | {error, Error}

    参数 Listen 是最初从 listen/1 回调传递的句柄。参数 Opts 是要在未来连接上设置的选项列表。

  • getopts(Listen, Opts) ->
      {ok, OptionValues} | {error, Error}

    参数 Listen 是最初从 listen/1 回调传递的句柄。参数 Opts 是要为未来连接读取的选项列表。

#hs_data{} 记录

dist_util:handshake_we_started/1dist_util:handshake_other_started/1 函数将 #hs_data{} 记录作为参数。此记录中有很多字段需要设置。该记录在 kernel/include/dist_util.hrl 中定义。未文档化的字段不应设置,即应保留为 undefined

除非另有说明,否则需要设置以下 #hs_data{} 记录字段

  • kernel_pid - Kernel 进程的进程标识符。也就是说,调用 setup/5accept_connection/5 的进程。

  • other_node - 另一个节点的名称。仅当此节点发起连接时,此字段才是强制性的。也就是说,当通过 setup/5 设置连接时。

  • this_node - 此节点的节点名称。

  • socket - 分布控制器的标识符。

  • timer - 使用 dist_util:start_timer/1 创建的计时器。

  • allowed - 作为 Allowed 传递给 accept_connection/5 的信息。仅当远程节点发起连接时,此字段才是强制性的。也就是说,当通过 accept_connection/5 设置连接时。

  • f_send - 具有以下签名的函数

    fun (DistCtrlr, Data) -> ok | {error, Error}

    其中 DistCtrlr 是分布控制器的标识符,Data 是要传递到另一侧的 io 数据。

    仅在握手阶段使用。

  • f_recv - 具有以下签名的函数

    fun (DistCtrlr, Length) -> {ok, Packet} | {error, Reason}

    其中 DistCtrlr 是分布控制器的标识符。如果 Length0,则应返回所有可用字节。如果 Length > 0,则应返回正好 Length 个字节,或返回错误;当从另一侧关闭连接时,可能会丢弃少于 Length 个字节的数据。它用于从另一端被动接收数据。

    仅在握手阶段使用。

  • f_setopts_pre_nodeup - 具有以下签名的函数

    fun (DistCtrlr) -> ok | {error, Error}

    其中 DistCtrlr 是分布控制器的标识符。在将分发通道用于正常流量之前调用。

    仅在握手阶段使用。

  • f_setopts_post_nodeup - 具有以下签名的函数

    fun (DistCtrlr) -> ok | {error, Error}

    其中 DistCtrlr 是分布控制器的标识符。在分发通道用于正常流量之后调用。

    仅在握手阶段使用。

  • f_getll - 具有以下签名的函数

    fun (DistCtrlr) -> ID

    其中 DistCtrlr 是分布控制器的标识符,ID 是处理连接的底层实体的标识符(通常是 DistCtrlr 本身)。

    仅在握手阶段使用。

  • f_address - 具有以下签名的函数

    fun (DistCtrlr, Node) -> NetAddress

    其中 DistCtrlr 是分布控制器的标识符,Node 是另一端的节点的节点名称,NetAddress 是一个 #net_address{} 记录,其中包含有关连接另一端 Node 的地址的信息。 #net_address{} 记录在 kernel/include/net_address.hrl 中定义。

    仅在握手阶段使用。

  • mf_tick - 具有以下签名的函数

    fun (DistCtrlr) -> void()

    其中 DistCtrlr 是分布控制器的标识符。此函数应通过连接发送另一端未解释的信息,同时增加另一端收到的数据包的统计信息。这通常通过发送空数据包来实现。

    注意

    至关重要的是,此操作不会长时间阻塞调用者。这是因为它从连接 supervisor 调用。

    连接活动时使用。

  • mf_getstat - 具有以下签名的函数

    fun (DistCtrlr) -> {ok, Received, Sent, PendSend}

    其中 DistCtrlr 是分布控制器的标识符,Received 是收到的数据包,Sent 是发送的数据包,PendSend 是要发送的队列中的数据量(通常以字节为单位,但是 dist_util 仅检查该值是否为非零以了解队列中是否有数据)或指示队列中是否有数据包要发送的 boolean/0

    注意

    至关重要的是,此操作不会长时间阻塞调用者。这是因为它从连接 supervisor 调用。

    连接活动时使用。

  • request_type - 传递给 setup/5 的请求 Type。仅当此节点发起连接时,此项才是强制性的。也就是说,通过 setup/5 设置连接时。

  • mf_setopts - 具有以下签名的函数

    fun (DistCtrl, Opts) -> ok | {error, Error}

    其中 DistCtrlr 是分布控制器的标识符,Opts 是要在连接上设置的选项列表。

    此函数是可选的。连接活动时使用。

  • mf_getopts - 具有以下签名的函数

    fun (DistCtrl, Opts) -> {ok, OptionValues} | {error, Error}

    其中 DistCtrlr 是分布控制器的标识符,Opts 是要为连接读取的选项列表。

    此函数是可选的。连接活动时使用。

  • f_handshake_complete - 具有以下签名的函数

    fun (DistCtrlr, Node, DHandle) -> void()

    其中 DistCtrlr 是分布控制器的标识符,Node 是连接在另一端的节点的节点名称,DHandle 是分布控制器进程在调用以下 BIF 时所需的分布句柄

    当握手完成后且分发通道建立时,会调用此函数。分发控制器可以开始通过该通道分发流量。此函数是可选的。

    仅在握手阶段使用。

  • add_flags - 要添加到连接的分发标志。目前,所有(非过时的)标志都将自动启用。

    此标志字段是可选的。

  • reject_flags - 要拒绝的分发标志。目前,可以拒绝以下分发标志:

    • DFLAG_DIST_HDR_ATOM_CACHE - 不要在此连接上使用原子缓存。

    • DFLAG_FRAGMENTS - 将大型分发消息拆分为多个片段。

    此标志字段是可选的。

    另请参阅分发数据传递

  • require_flags - 要求使用的这些分发标志。如果另一端不使用它们,则连接将在握手期间中止。

    此标志字段是可选的。

分发数据传递

当使用默认配置时,需要将通过连接传递的数据,按照完全相同的顺序,原样传递到接收端的节点,并且不能有任何数据丢失,就像从发送节点发送的一样。

可以通过禁用需要严格排序的功能来放宽数据传递顺序。这可以通过在建立连接时使用的#hs_data{}记录的reject_flags字段中传递由dist_util:strict_order_flags/0返回的分发标志来实现。当使用放宽的排序时,只需要保留具有相同发送者/接收者对的信号顺序。但是,请注意,禁用需要严格排序的功能可能会对性能、吞吐量和/或延迟产生负面影响。

启用你的分发模块

为了让net_kernel找出要使用的分发模块,需要使用erl命令行参数-proto_dist。它后面跟一个或多个分发模块名称,删除了后缀“_dist”。也就是说,将gen_tcp_dist作为分发模块指定为-proto_dist gen_tcp

如果没有使用epmd(TCP端口映射器守护进程),还需要指定命令行选项-no_epmd,这使得Erlang跳过epmd的启动,无论是作为操作系统进程还是作为Erlang副本。

驱动程序

注意

本节是在很久以前编写的。其中大部分仍然有效,但自那时以来,一些事情已经发生了变化。这里介绍的驱动程序的文档已经进行了一些更新,但是可以做更多的工作,并且计划在未来进行。建议读者阅读erl_driverdriver_entry文档。

尽管Erlang驱动程序通常可以超出本节的范围,但简要介绍似乎是合适的。

通用驱动程序

Erlang驱动程序是用C(或汇编程序)编写的本机代码模块,它充当某些特殊操作系统服务的接口。这是一种通用机制,在整个Erlang模拟器中用于各种I/O。可以使用erl_ddll Erlang模块在运行时将Erlang驱动程序动态链接(或加载)到Erlang模拟器。但是,OTP中的某些驱动程序是静态链接到运行时系统的,但这更多是一种优化,而不是必需的。

驱动程序数据类型和可供驱动程序编写者使用的函数在Erlang的include目录中的头文件erl_driver.h中定义。有关哪些函数可用的详细信息,请参阅erl_driver文档。

在编写驱动程序以使通信协议可供Erlang使用时,应该了解有关该特定协议的所有值得了解的内容。所有操作都必须是非阻塞的,并且必须在驱动程序中考虑所有可能的情况。不稳定的驱动程序将影响和/或使整个Erlang运行时系统崩溃。

模拟器在以下情况下调用驱动程序:

  • 加载驱动程序时。此回调必须具有特殊名称,并通过返回指向ErlDrvEntry结构的指针来通知模拟器要使用哪些回调,该结构需要正确填写(见下文)。
  • 当打开到驱动程序的端口时(通过Erlang中的open_port调用)。此例程用于设置内部数据结构并返回ErlDrvData类型的opaque数据实体,该数据类型足够大以容纳指针。此函数返回的指针是有关此特定端口的所有其他回调的第一个参数。它通常被称为端口句柄。模拟器仅存储句柄,从不尝试解释它,因此它可以是几乎任何东西(任何不大于指针的东西),如果它是指针,则可以指向任何东西。通常,此指针引用一个结构,该结构包含有关特定端口的信息,就像示例中一样。
  • 当Erlang进程将数据发送到端口时。数据以字节缓冲区形式到达,解释未定义,但取决于实现者。此回调不向调用者返回任何内容,答案以消息的形式发送给调用者(使用一个名为driver_output的例程,该例程可供所有驱动程序使用)。还有一种与驱动程序进行同步通信的方式,如下所述。可以有一个额外的回调函数来处理已分片的数据(在深层io-list中发送)。该接口以适合Unix writev的形式获取数据,而不是单个缓冲区。分发驱动程序不需要实现这样的回调,因此我们不会这样做。
  • 当文件描述符发出输入信号时。当模拟器检测到驱动程序已标记为使用driver_select接口进行监视的文件描述符上的输入时,将调用此回调。驱动程序选择的机制可以通过在需要读取时调用driver_select来从文件描述符进行非阻塞读取,然后在此回调中进行读取(当可以读取时)。典型的场景是,当Erlang进程命令读取操作时调用driver_select,并且当文件描述符上有可用数据时,此例程会发送答案。
  • 当文件描述符发出输出信号时。此回调的调用方式与上一个类似,但是当可以写入文件描述符时。通常的情况是,Erlang命令在文件描述符上进行写入,并且驱动程序调用driver_select。当描述符准备好输出时,将调用此回调,驱动程序可以尝试发送输出。此类操作可能涉及排队,并且有方便的队列例程可供驱动程序编写者使用。
  • 当端口关闭时,无论是通过Erlang进程还是通过驱动程序调用driver_failure_XXX例程之一。此例程用于清理与特定端口相关的所有内容。当其他回调调用driver_failure_XXX例程时,将立即调用此例程。发出错误的调回例程不能再使用端口的数据结构,因为此例程肯定已释放所有关联的数据并关闭了所有文件描述符。但是,如果使用可供驱动程序编写者使用的队列实用程序,则直到队列为空,才会调用此例程。
  • 当Erlang进程调用erlang:port_control/3时,这是驱动程序的同步接口。控制接口用于设置驱动程序选项、更改端口状态等等。此接口在示例中大量使用。
  • 当计时器过期时。驱动程序可以使用函数driver_set_timer设置计时器。当此类计时器过期时,将调用特定的回调函数。示例中未使用任何计时器。
  • 当整个驱动程序被卸载时。驱动程序分配的每个资源都将被释放。

分发驱动程序的数据结构

用于Erlang分发的驱动程序需要实现可靠、保持顺序、可变长度的面向数据包的协议。所有错误纠正、重发等都需要在驱动程序中或由底层通信协议来实现。如果协议是面向流的(例如TCP/IP和我们的流式Unix域套接字),则需要某种打包机制。我们将使用一种简单的方法,即使用一个包含数据包长度的四个字节的标头(以大端32位整数表示)。由于Unix域套接字只能在同一台机器上的进程之间使用,因此我们不需要以某种特殊的字节序来编码整数,但无论如何我们都会这样做,因为在大多数情况下你需要这样做。Unix域套接字是可靠且保持顺序的,因此我们不需要在驱动程序中实现重发等功能。

我们首先通过声明原型并填写静态的ErlDrvEntry结构来编写示例Unix域套接字驱动程序。

( 1) #include <stdio.h>
( 2) #include <stdlib.h>
( 3) #include <string.h>
( 4) #include <unistd.h>
( 5) #include <errno.h>
( 6) #include <sys/types.h>
( 7) #include <sys/stat.h>
( 8) #include <sys/socket.h>
( 9) #include <sys/un.h>
(10) #include <fcntl.h>

(11) #define HAVE_UIO_H
(12) #include "erl_driver.h"

(13) /*
(14) ** Interface routines
(15) */
(16) static ErlDrvData uds_start(ErlDrvPort port, char *buff);
(17) static void uds_stop(ErlDrvData handle);
(18) static void uds_command(ErlDrvData handle, char *buff, int bufflen);
(19) static void uds_input(ErlDrvData handle, ErlDrvEvent event);
(20) static void uds_output(ErlDrvData handle, ErlDrvEvent event);
(21) static void uds_finish(void);
(22) static int uds_control(ErlDrvData handle, unsigned int command,
(23)                        char* buf, int count, char** res, int res_size);

(24) /* The driver entry */
(25) static ErlDrvEntry uds_driver_entry = {
(26)     NULL,                            /* init, N/A */
(27)     uds_start,                       /* start, called when port is opened */
(28)     uds_stop,                        /* stop, called when port is closed */
(29)     uds_command,                     /* output, called when erlang has sent */
(30)     uds_input,                       /* ready_input, called when input
(31)                                         descriptor ready */
(32)     uds_output,                      /* ready_output, called when output
(33)                                         descriptor ready */
(34)     "uds_drv",                       /* char *driver_name, the argument
(35)                                         to open_port */
(36)     uds_finish,                      /* finish, called when unloaded */
(37)     NULL,                            /* void * that is not used (BC) */
(38)     uds_control,                     /* control, port_control callback */
(39)     NULL,                            /* timeout, called on timeouts */
(40)     NULL,                            /* outputv, vector output interface */
(41)     NULL,                            /* ready_async callback */
(42)     NULL,                            /* flush callback */
(43)     NULL,                            /* call callback */
(44)     NULL,                            /* event callback */
(45)     ERL_DRV_EXTENDED_MARKER,         /* Extended driver interface marker */
(46)     ERL_DRV_EXTENDED_MAJOR_VERSION,  /* Major version number */
(47)     ERL_DRV_EXTENDED_MINOR_VERSION,  /* Minor version number */
(48)     ERL_DRV_FLAG_SOFT_BUSY,          /* Driver flags. Soft busy flag is
(49)                                         required for distribution drivers */
(50)     NULL,                            /* Reserved for internal use */
(51)     NULL,                            /* process_exit callback */
(52)     NULL                             /* stop_select callback */
(53) };

在第1-10行中,包含了驱动程序所需的操作系统头文件。由于此驱动程序是为Solaris编写的,因此我们知道头文件uio.h存在。因此,可以在第12行包含erl_driver.h之前定义预处理器变量HAVE_UIO_HHAVE_UIO_H的定义将使Erlang驱动程序队列中使用的I/O向量与操作系统的副本相对应,这非常方便。

在第 16-23 行,声明了不同的回调函数(“前向声明”)。

静态链接驱动和动态加载驱动的驱动结构类似。但是,在不同类型的驱动中,某些字段应该留空(即初始化为 NULL)。第一个字段(init 函数指针)在动态加载驱动中始终留空,见第 26 行。第 37 行的 NULL 始终应该存在,该字段不再使用,保留是为了向后兼容。此驱动程序中没有使用定时器,因此不需要定时器的回调。 outputv 字段(第 40 行)可用于实现类似于 Unix writev 的输出接口。Erlang 运行时系统以前不能将 outputv 用于分发,但自 ERTS 5.7.2 起可以。由于此驱动程序是在 ERTS 5.7.2 之前编写的,因此它不使用 outputv 回调。使用 outputv 回调是首选,因为它减少了数据复制。(但是,我们将在驱动程序内部使用散布/收集 I/O。)

从 ERTS 5.5.3 开始,驱动程序接口扩展了版本控制和传递功能信息的可能性。功能标志位于第 48 行。从 ERTS 5.7.4 开始,分发使用的驱动程序需要 ERL_DRV_FLAG_SOFT_BUSY 标志。软忙标志意味着,尽管驱动程序已将自身标记为忙,但它可以处理对 outputoutputv 回调的调用。这一直是分发使用的驱动程序的要求,但以前没有关于此的功能信息。有关更多信息,请参阅 erl_driver:set_busy_port())。

此驱动程序是在运行时系统支持 SMP 之前编写的。该驱动程序仍将在具有 SMP 支持的运行时系统中运行,但是性能将受到驱动程序锁上锁争用的影响。可以通过检查并可能重写代码来缓解这种情况,以便驱动程序的每个实例都可以安全地并行执行。当实例可以安全地并行执行时,可以安全地在驱动程序上启用实例特定的锁定。这可以通过将 ERL_DRV_FLAG_USE_PORT_LOCKING 作为驱动程序标志传递来完成。这留给读者作为练习。

因此,定义的回调如下:

  • uds_start - 必须为端口启动数据。我们这里不创建任何套接字,仅初始化数据结构。

  • uds_stop - 在端口关闭时调用。

  • uds_command - 处理来自 Erlang 的消息。消息可以是待发送的纯数据,也可以是发送给驱动程序的更细微的指令。此函数主要用于数据泵送。

  • uds_input - 当有数据可从套接字读取时调用。

  • uds_output - 当可以写入套接字时调用。

  • uds_finish - 在卸载驱动程序时调用。永远不会卸载分发驱动程序,但我们包括此项以保持完整性。能够自行清理始终是一件好事。

  • uds_control - erlang:port_control/3 回调,在此实现中大量使用。

此驱动程序实现的端口以两种主要模式运行,分别名为 commanddata。在 command 模式下,只能进行被动读写(如 gen_tcp:recv/gen_tcp:send)。端口在分发握手期间处于此模式。当连接建立时,端口切换到 data 模式,所有数据都会立即读取并进一步传递给 Erlang 模拟器。在 data 模式下,到达 uds_command 的任何数据都不会被解释,仅被打包并通过套接字发送出去。uds_control 回调在两种模式之间切换。

net_kernel 通知不同的子系统连接即将建立时,端口应接受要发送的数据。但是,端口不应接收任何数据,以避免在每个内核子系统准备好处理数据之前,数据从另一个节点到达。第三种模式,名为 intermediate,用于此中间阶段。

为不同类型的端口定义了一个枚举

( 1) typedef enum {
( 2)     portTypeUnknown,      /* An uninitialized port */
( 3)     portTypeListener,     /* A listening port/socket */
( 4)     portTypeAcceptor,     /* An intermediate stage when accepting
( 5)                              on a listen port */
( 6)     portTypeConnector,    /* An intermediate stage when connecting */
( 7)     portTypeCommand,      /* A connected open port in command mode */
( 8)     portTypeIntermediate, /* A connected open port in special
( 9)                              half active mode */
(10)     portTypeData          /* A connected open port in data mode */
(11) } PortType;

不同的类型如下:

  • portTypeUnknown - 端口打开但未绑定到任何文件描述符时,端口的类型。

  • portTypeListener - 连接到侦听套接字的端口。此端口没有太多作用,此套接字上不进行数据泵送,但是当尝试在端口上执行 accept 操作时,可以读取数据。

  • portTypeAcceptor - 此端口表示 accept 操作的结果。它在想要从侦听套接字接受连接时创建,并在 accept 成功时转换为 portTypeCommand

  • portTypeConnector - 与 portTypeAcceptor 非常相似,是请求连接操作与套接字连接到另一端的接受端口之间的中间阶段。当套接字连接时,端口的类型切换为 portTypeCommand

  • portTypeCommand - 如前所述,处于 command 模式下的已连接套接字(或已接受的套接字)。

  • portTypeIntermediate - 已连接套接字的中间阶段。此套接字不进行任何输入处理。

  • portTypeData - 数据通过端口泵送的模式,并且 uds_command 例程将每次调用都视为需要发送的调用。在此模式下,当输入到达套接字时,所有可用的输入都会被读取并发送到 Erlang,这与 gen_tcp 套接字的活动模式非常相似。

我们研究端口所需的状态。请注意,并非所有类型的端口都使用所有字段。可以通过使用联合来节省一些空间,但这会使代码因多个间接寻址而变得混乱,因此这里为所有类型的端口使用一个结构,以提高可读性

( 1) typedef unsigned char Byte;
( 2) typedef unsigned int Word;

( 3) typedef struct uds_data {
( 4)     int fd;                   /* File descriptor */
( 5)     ErlDrvPort port;          /* The port identifier */
( 6)     int lockfd;               /* The file descriptor for a lock file in
( 7)                                  case of listen sockets */
( 8)     Byte creation;            /* The creation serial derived from the
( 9)                                  lock file */
(10)     PortType type;            /* Type of port */
(11)     char *name;               /* Short name of socket for unlink */
(12)     Word sent;                /* Bytes sent */
(13)     Word received;            /* Bytes received */
(14)     struct uds_data *partner; /* The partner in an accept/listen pair */
(15)     struct uds_data *next;    /* Next structure in list */
(16)     /* The input buffer and its data */
(17)     int buffer_size;          /* The allocated size of the input buffer */
(18)     int buffer_pos;           /* Current position in input buffer */
(19)     int header_pos;           /* Where the current header is in the
(20)                                  input buffer */
(21)     Byte *buffer;             /* The actual input buffer */
(22) } UdsData;

尽管某些字段对于某些类型没有用处,但此结构用于所有类型的端口。内存消耗最少的解决方案是将此结构安排为结构体的联合。但是,代码中访问此类结构中的字段的多个间接寻址会使代码过于混乱,不适合作为示例。

结构中的字段如下:

  • fd - 与端口关联的套接字的文件描述符。

  • port - 此结构对应的端口的端口标识符。从驱动程序向仿真器的大多数 driver_XXX 调用都需要它。

  • lockfd - 如果套接字是侦听套接字,我们将使用单独的(常规)文件用于两个目的

    • 我们需要一个不会产生竞争条件的锁定机制,以确保如果另一个 Erlang 节点使用我们需要的侦听套接字名称,或者该文件仅是从之前的(崩溃的)会话中留下的。

    • 我们将 creation 序列号存储在文件中。creation 是一个数字,在具有相同名称的不同 Erlang 仿真器的不同实例之间会发生变化,因此来自一个仿真器的进程标识符在发送到具有相同分发名称的新仿真器时不会变为有效。创建号可以是 0 到 3(两位),并存储在发送到另一个节点的每个进程标识符中。

      在具有基于 TCP 的分发的系统中,此数据保存在 Erlang 端口映射器守护进程 (epmd) 中,当分布式节点启动时,会联系该守护进程。当使用此分发模块时,锁定文件和 UDS 侦听套接字名称的约定消除了对 epmd 的需求。UDS 始终限制为一台主机,因此避免使用端口映射器很容易。

  • creation - 侦听套接字的创建号,计算为(在锁文件中找到的值 + 1)rem 4。此创建值也会写回到锁文件中,以便仿真器的下一次调用可以在文件中找到我们的值。

  • type - 端口的当前类型/状态,可以是上面声明的值之一。

  • name - 套接字文件的名称(删除了路径前缀),这允许在套接字关闭时删除(unlink)。

  • sent - 通过套接字发送的字节数。它可以回绕,但这对分发没有问题,因为 Erlang 分发仅对该值是否已更改感兴趣。(Erlang net_kernel ticker 通过调用驱动程序来获取此值来使用此值,这通过 erlang:port_control/3 例程完成。)

  • received - 从套接字读取(接收)的字节数,其使用方式与 sent 类似。

  • partner - 指向另一个端口结构的指针,该结构要么是此端口正在从中接受连接的侦听端口,反之亦然。“伙伴关系”始终是双向的。

  • next - 指向所有端口结构链表中下一个结构的指针。此列表在接受连接和卸载驱动程序时使用。

  • buffer_sizebuffer_posheader_posbuffer - 用于输入缓冲的数据。有关输入缓冲的详细信息,请参阅目录 kernel/examples 中的源代码。这肯定超出了本节的范围。

分发驱动程序实现的选定部分

此处未完全介绍分发驱动程序的实现,未解释有关缓冲和其他与驱动程序编写无关的事项的详细信息。同样,未详细解释 UDS 协议的一些特性。选择的协议并不重要。

可以在 erl_driver.h 头文件中找到驱动程序回调例程的原型。

驱动程序初始化例程(通常)使用宏声明,以便使驱动程序更容易在不同的操作系统(和系统风格)之间移植。这是唯一必须具有明确名称的例程。所有其他回调都通过驱动程序结构访问。要使用的宏名为 DRIVER_INIT,它将驱动程序名称作为参数

(1) /* Beginning of linked list of ports */
(2) static UdsData *first_data;

(3) DRIVER_INIT(uds_drv)
(4) {
(5)     first_data = NULL;
(6)     return &uds_driver_entry;
(7) }

该例程初始化单个全局数据结构,并返回指向驱动程序条目的指针。从 Erlang 调用 erl_ddll:load_driver 时,会调用该例程。

当从 Erlang 打开端口时,会调用 uds_start 例程。在这种情况下,我们只分配一个结构并初始化它。创建实际的套接字留给 uds_command 例程。

( 1) static ErlDrvData uds_start(ErlDrvPort port, char *buff)
( 2) {
( 3)     UdsData *ud;
( 4)
( 5)     ud = ALLOC(sizeof(UdsData));
( 6)     ud->fd = -1;
( 7)     ud->lockfd = -1;
( 8)     ud->creation = 0;
( 9)     ud->port = port;
(10)     ud->type = portTypeUnknown;
(11)     ud->name = NULL;
(12)     ud->buffer_size = 0;
(13)     ud->buffer_pos = 0;
(14)     ud->header_pos = 0;
(15)     ud->buffer = NULL;
(16)     ud->sent = 0;
(17)     ud->received = 0;
(18)     ud->partner = NULL;
(19)     ud->next = first_data;
(20)     first_data = ud;
(21)
(22)     return((ErlDrvData) ud);
(23) }

每个数据项都会被初始化,以便在关闭新创建的端口(而没有任何相应的套接字)时不会出现问题。当从 Erlang 调用 open_port({spawn, "uds_drv"},[]) 时,会调用此例程。

当 Erlang 进程向端口发送数据时,会调用 uds_command 例程。当端口处于 command 模式时,此例程处理所有异步命令;当端口处于 data 模式时,此例程处理所有数据的发送。

( 1) static void uds_command(ErlDrvData handle, char *buff, int bufflen)
( 2) {
( 3)     UdsData *ud = (UdsData *) handle;

( 4)     if (ud->type == portTypeData || ud->type == portTypeIntermediate) {
( 5)         DEBUGF(("Passive do_send %d",bufflen));
( 6)         do_send(ud, buff + 1, bufflen - 1); /* XXX */
( 7)         return;
( 8)     }
( 9)     if (bufflen == 0) {
(10)         return;
(11)     }
(12)     switch (*buff) {
(13)     case 'L':
(14)         if (ud->type != portTypeUnknown) {
(15)             driver_failure_posix(ud->port, ENOTSUP);
(16)             return;
(17)         }
(18)         uds_command_listen(ud,buff,bufflen);
(19)         return;
(20)     case 'A':
(21)         if (ud->type != portTypeUnknown) {
(22)             driver_failure_posix(ud->port, ENOTSUP);
(23)             return;
(24)         }
(25)         uds_command_accept(ud,buff,bufflen);
(26)         return;
(27)     case 'C':
(28)         if (ud->type != portTypeUnknown) {
(29)             driver_failure_posix(ud->port, ENOTSUP);
(30)             return;
(31)         }
(32)         uds_command_connect(ud,buff,bufflen);
(33)         return;
(34)     case 'S':
(35)         if (ud->type != portTypeCommand) {
(36)             driver_failure_posix(ud->port, ENOTSUP);
(37)             return;
(38)         }
(39)         do_send(ud, buff + 1, bufflen - 1);
(40)         return;
(41)     case 'R':
(42)         if (ud->type != portTypeCommand) {
(43)             driver_failure_posix(ud->port, ENOTSUP);
(44)             return;
(45)         }
(46)         do_recv(ud);
(47)         return;
(48)     default:
(49)         return;
(50)     }
(51) }

命令例程接受三个参数:由 uds_start 为端口返回的句柄(指向内部端口结构的指针)、数据缓冲区和数据缓冲区的长度。缓冲区是从 Erlang 发送的数据(字节列表)转换为 C 数组(字节数组)。

例如,如果 Erlang 将列表 [$a,$b,$c] 发送到端口,则 bufflen 变量为 3buff 变量包含 {'a','b','c'}(没有 NULL 终止符)。通常,第一个字节用作操作码,此驱动程序也是如此(至少当端口处于 command 模式时)。操作码定义如下:

  • 'L'<socket name> - 创建并监听具有指定名称的套接字。

  • 'A'<listen number as 32-bit big-endian> - 从指定标识号标识的监听套接字接受连接。标识号通过 uds_control 例程检索。

  • 'C'<socket name> - 连接到名为 <socket name> 的套接字。

  • 'S'<data> - 在已连接/已接受的套接字上发送数据 <data>(在 command 模式下)。当数据离开此进程时,会确认发送。

  • 'R' - 接收一个数据包。

命令 'R' 中的“一个数据包”可以解释如下。此驱动程序始终发送带有 4 字节头的数据包,该头部包含一个大端 32 位整数,表示数据包中数据的长度。由于此驱动程序仅用于分发,因此无需使用不同的数据包大小或某种流模式。当 UDS 套接字位于本地主机时,为什么头部字会被显式编码为大端?编写分发驱动程序时,这是一个好的做法,因为实践中的分发通常会跨越主机边界。

第 4-8 行处理端口处于 data 模式或 intermediate 模式的情况,其余例程处理不同的命令。该例程使用 driver_failure_posix() 例程报告错误(例如,参见第 15 行)。请注意,失败例程会调用 uds_stop 例程,这将删除内部端口数据。因此,在调用 driver_failure 后,句柄(以及强制转换后的句柄 ud)是无效指针,我们应该立即返回。运行时系统会将退出信号发送到所有链接的进程。

当先前传递给 driver_select 例程的文件描述符上有数据可用时,会调用 uds_input 例程。这通常发生在发出读取命令且没有数据可用时。 do_recv 例程如下:

( 1) static void do_recv(UdsData *ud)
( 2) {
( 3)     int res;
( 4)     char *ibuf;
( 5)     for(;;) {
( 6)         if ((res = buffered_read_package(ud,&ibuf)) < 0) {
( 7)             if (res == NORMAL_READ_FAILURE) {
( 8)                 driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 1);
( 9)             } else {
(10)                 driver_failure_eof(ud->port);
(11)             }
(12)             return;
(13)         }
(14)         /* Got a package */
(15)         if (ud->type == portTypeCommand) {
(16)             ibuf[-1] = 'R'; /* There is always room for a single byte
(17)                                opcode before the actual buffer
(18)                                (where the packet header was) */
(19)             driver_output(ud->port,ibuf - 1, res + 1);
(20)             driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ,0);
(21)             return;
(22)         } else {
(23)             ibuf[-1] = DIST_MAGIC_RECV_TAG; /* XXX */
(24)             driver_output(ud->port,ibuf - 1, res + 1);
(25)             driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ,1);
(26)         }
(27)     }
(28) }

该例程尝试读取数据,直到读取一个数据包或 buffered_read_package 例程返回 NORMAL_READ_FAILURE(模块内部定义的常量,表示读取操作导致 EWOULDBLOCK)。如果端口处于 command 模式,则在读取一个数据包时停止读取。如果端口处于 data 模式,则读取会持续到套接字缓冲区为空(读取失败)。如果没有更多数据可读取并且需要更多数据(当套接字处于 data 模式时总是如此),则会调用 driver_select,以便在有更多数据可供读取时调用 uds_input 回调。

当端口处于 data 模式时,所有数据都以适合分发的格式发送到 Erlang。实际上,原始数据永远不会到达任何 Erlang 进程,而是会由模拟器本身进行转换/解释,然后以正确的格式传递给正确的进程。在当前模拟器版本中,接收到的数据将用一个字节 100 进行标记。这就是宏 DIST_MAGIC_RECV_TAG 的定义。将来可能会更改分发中数据的标记方式。

uds_input 例程处理其他输入事件(如非阻塞 accept),但最重要的是通过调用 do_recv 来处理到达套接字的数据。

( 1) static void uds_input(ErlDrvData handle, ErlDrvEvent event)
( 2) {
( 3)     UdsData *ud = (UdsData *) handle;

( 4)     if (ud->type == portTypeListener) {
( 5)         UdsData *ad = ud->partner;
( 6)         struct sockaddr_un peer;
( 7)         int pl = sizeof(struct sockaddr_un);
( 8)         int fd;

( 9)         if ((fd = accept(ud->fd, (struct sockaddr *) &peer, &pl)) < 0) {
(10)             if (errno != EWOULDBLOCK) {
(11)                 driver_failure_posix(ud->port, errno);
(12)                 return;
(13)             }
(14)             return;
(15)         }
(16)         SET_NONBLOCKING(fd);
(17)         ad->fd = fd;
(18)         ad->partner = NULL;
(19)         ad->type = portTypeCommand;
(20)         ud->partner = NULL;
(21)         driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 0);
(22)         driver_output(ad->port, "Aok",3);
(23)         return;
(24)     }
(25)     do_recv(ud);
(26) }

函数中重要的一行是最后一行:调用 do_read 例程来处理新的输入。其余函数处理监听套接字上的输入,这意味着可以在套接字上执行 accept 操作,这也将被识别为读取事件。

输出机制与输入类似。do_send 例程如下:

( 1) static void do_send(UdsData *ud, char *buff, int bufflen)
( 2) {
( 3)     char header[4];
( 4)     int written;
( 5)     SysIOVec iov[2];
( 6)     ErlIOVec eio;
( 7)     ErlDrvBinary *binv[] = {NULL,NULL};

( 8)     put_packet_length(header, bufflen);
( 9)     iov[0].iov_base = (char *) header;
(10)     iov[0].iov_len = 4;
(11)     iov[1].iov_base = buff;
(12)     iov[1].iov_len = bufflen;
(13)     eio.iov = iov;
(14)     eio.binv = binv;
(15)     eio.vsize = 2;
(16)     eio.size = bufflen + 4;
(17)     written = 0;
(18)     if (driver_sizeq(ud->port) == 0) {
(19)         if ((written = writev(ud->fd, iov, 2)) == eio.size) {
(20)             ud->sent += written;
(21)             if (ud->type == portTypeCommand) {
(22)                 driver_output(ud->port, "Sok", 3);
(23)             }
(24)             return;
(25)         } else if (written < 0) {
(26)             if (errno != EWOULDBLOCK) {
(27)                 driver_failure_eof(ud->port);
(28)                 return;
(29)             } else {
(30)                 written = 0;
(31)             }
(32)         } else {
(33)             ud->sent += written;
(34)         }
(35)         /* Enqueue remaining */
(36)     }
(37)     driver_enqv(ud->port, &eio, written);
(38)     send_out_queue(ud);
(39) }

此驱动程序使用 writev 系统调用将数据发送到套接字。 writev 和驱动程序输出队列的组合非常方便。 ErlIOVec 结构包含一个 SysIOVec(等效于 uio.h 中定义的 struct iovec 结构)。 ErlIOVec 还包含一个 ErlDrvBinary 指针数组,其长度与 I/O 向量本身的缓冲区数量相同。可以使用此方法在驱动程序中“手动”为队列分配二进制文件,但此处二进制数组填充了 NULL 值(第 7 行)。当调用 driver_enqv 时(第 37 行),运行时系统会分配自己的缓冲区。

该例程构建一个包含头字节和缓冲区的 I/O 向量(操作码已被删除,缓冲区长度已通过输出例程减小)。如果队列为空,我们会将数据直接写入套接字(或至少尝试写入)。如果剩余任何数据,则将其存储在队列中,然后我们尝试发送队列(第 38 行)。当消息完全传递时,会发送确认(第 22 行)。如果发送在那里完成,则 send_out_queue 会发送确认。如果端口处于 command 模式,Erlang 代码会序列化发送操作,以便一次只能有一个数据包等待传递。因此,只要队列为空,就可以发送确认。

send_out_queue 例程如下:

( 1) static int send_out_queue(UdsData *ud)
( 2) {
( 3)     for(;;) {
( 4)         int vlen;
( 5)         SysIOVec *tmp = driver_peekq(ud->port, &vlen);
( 6)         int wrote;
( 7)         if (tmp == NULL) {
( 8)             driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_WRITE, 0);
( 9)             if (ud->type == portTypeCommand) {
(10)                 driver_output(ud->port, "Sok", 3);
(11)             }
(12)             return 0;
(13)         }
(14)         if (vlen > IO_VECTOR_MAX) {
(15)             vlen = IO_VECTOR_MAX;
(16)         }
(17)         if ((wrote = writev(ud->fd, tmp, vlen)) < 0) {
(18)             if (errno == EWOULDBLOCK) {
(19)                 driver_select(ud->port, (ErlDrvEvent) ud->fd,
(20)                               DO_WRITE, 1);
(21)                 return 0;
(22)             } else {
(23)                 driver_failure_eof(ud->port);
(24)                 return -1;
(25)             }
(26)         }
(27)         driver_deq(ud->port, wrote);
(28)         ud->sent += wrote;
(29)     }
(30) }

我们只是从队列中选择一个 I/O 向量(整个队列作为一个 SysIOVec)。如果 I/O 向量太长(IO_VECTOR_MAX 定义为 16),则会减小向量长度(第 15 行),否则 writev 调用(第 17 行)会失败。尝试进行写入,并将写入的任何内容出队(第 27 行)。如果写入因 EWOULDBLOCK 而失败(请注意,所有套接字都处于非阻塞模式),则会调用 driver_select,以便在有空间再次写入时调用 uds_output 例程。

我们会继续尝试写入,直到队列为空或写入阻塞为止。

以上例程是从 uds_output 例程调用的:

( 1) static void uds_output(ErlDrvData handle, ErlDrvEvent event)
( 2) {
( 3)    UdsData *ud = (UdsData *) handle;
( 4)    if (ud->type == portTypeConnector) {
( 5)        ud->type = portTypeCommand;
( 6)        driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_WRITE, 0);
( 7)        driver_output(ud->port, "Cok",3);
( 8)        return;
( 9)    }
(10)    send_out_queue(ud);
(11) }

该例程很简单:它首先处理输出选择将涉及正在连接的套接字(并且连接被阻止)的事实。如果套接字处于连接状态,则只需发送输出队列。当可以写入具有输出队列的套接字时会调用此例程,因此没有疑问该怎么做。

该驱动程序实现了一个控制接口,这是一个同步接口,当 Erlang 调用 erlang:port_control/3 时会调用该接口。只有此接口才能在驱动程序处于 data 模式时控制驱动程序。可以使用以下操作码调用它:

  • 'C' - 将端口设置为 command 模式。

  • 'I' - 将端口设置为 intermediate 模式。

  • 'D' - 将端口设置为 data 模式。

  • 'N' - 获取监听端口的标识号。此标识号用于驱动程序的接受命令中。它以大端 32 位整数返回,该整数是监听套接字的文件标识符。

  • 'S' - 获取统计信息,包括接收的字节数、发送的字节数以及输出队列中待处理的字节数。当分发检查连接是否处于活动状态(心跳)时,会使用此数据。统计信息以三个大端 32 位整数返回。

  • 'T' - 发送心跳消息,这是一个长度为 0 的数据包。当端口处于 data 模式时会执行心跳操作,因此不能使用发送数据的命令(此外,它会忽略 command 模式下的零长度数据包)。当没有其他流量时,心跳器会使用此方法发送虚拟数据。

    注意:重要的是,发送心跳的接口不是阻塞的。此实现使用 erlang:port_control/3,它不会阻塞调用者。如果使用 erlang:port_command,请使用 erlang:port_command/3 并将 [force] 作为选项列表传递;否则,调用者可能会在繁忙端口上无限期地阻塞,并阻止系统关闭无法正常运行的连接。

  • 'R' - 获取监听套接字的创建编号,该编号用于挖掘存储在锁定文件中的编号,以区分具有相同名称的 Erlang 节点的调用。

控制接口会获取一个缓冲区来返回其值,但如果提供的缓冲区太小,可以自由分配自己的缓冲区。uds_control 代码如下:

( 1) static int uds_control(ErlDrvData handle, unsigned int command,
( 2)                        char* buf, int count, char** res, int res_size)
( 3) {
( 4) /* Local macro to ensure large enough buffer. */
( 5) #define ENSURE(N)                               \
( 6)    do {                                         \
( 7)        if (res_size < N) {                      \
( 8)            *res = ALLOC(N);                     \
( 9)        }                                        \
(10)    } while(0)

(11)    UdsData *ud = (UdsData *) handle;

(12)    switch (command) {
(13)    case 'S':
(14)        {
(15)            ENSURE(13);
(16)            **res = 0;
(17)            put_packet_length((*res) + 1, ud->received);
(18)            put_packet_length((*res) + 5, ud->sent);
(19)            put_packet_length((*res) + 9, driver_sizeq(ud->port));
(20)            return 13;
(21)        }
(22)    case 'C':
(23)        if (ud->type < portTypeCommand) {
(24)            return report_control_error(res, res_size, "einval");
(25)        }
(26)        ud->type = portTypeCommand;
(27)        driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 0);
(28)        ENSURE(1);
(29)        **res = 0;
(30)        return 1;
(31)    case 'I':
(32)        if (ud->type < portTypeCommand) {
(33)            return report_control_error(res, res_size, "einval");
(34)        }
(35)        ud->type = portTypeIntermediate;
(36)        driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 0);
(37)        ENSURE(1);
(38)        **res = 0;
(39)        return 1;
(40)    case 'D':
(41)        if (ud->type < portTypeCommand) {
(42)            return report_control_error(res, res_size, "einval");
(43)        }
(44)        ud->type = portTypeData;
(45)        do_recv(ud);
(46)        ENSURE(1);
(47)        **res = 0;
(48)        return 1;
(49)    case 'N':
(50)        if (ud->type != portTypeListener) {
(51)            return report_control_error(res, res_size, "einval");
(52)        }
(53)        ENSURE(5);
(54)        (*res)[0] = 0;
(55)        put_packet_length((*res) + 1, ud->fd);
(56)        return 5;
(57)    case 'T': /* tick */
(58)        if (ud->type != portTypeData) {
(59)            return report_control_error(res, res_size, "einval");
(60)        }
(61)        do_send(ud,"",0);
(62)        ENSURE(1);
(63)        **res = 0;
(64)        return 1;
(65)    case 'R':
(66)        if (ud->type != portTypeListener) {
(67)            return report_control_error(res, res_size, "einval");
(68)        }
(69)        ENSURE(2);
(70)        (*res)[0] = 0;
(71)        (*res)[1] = ud->creation;
(72)        return 2;
(73)    default:
(74)        return report_control_error(res, res_size, "einval");
(75)    }
(76) #undef ENSURE
(77) }

ENSURE(第 5-10 行)用于确保缓冲区足够大以容纳答案。我们根据命令进行切换并采取行动。我们始终在处于 data 模式的端口上激活读取选择(通过在第 45 行调用 do_recv 实现),但我们在 intermediatecommand 模式下关闭读取选择(第 27 行和第 36 行)。

驱动程序的其余部分或多或少特定于 UDS,不具有普遍意义。

综合运用

要测试分布,可以使用 net_kernel:start/1 函数。它很有用,因为它可以在运行的系统上启动分布,从而可以执行跟踪/调试。 net_kernel:start/1 例程接受一个列表作为其唯一参数。列表中的第一个元素应该是节点名称(不带“@hostname”)的原子。第二个(也是最后一个)元素应该是原子 shortnameslongnames 之一。在示例中,首选 shortnames

为了使 net_kernel 找出要使用的分布模块,需要使用命令行参数 -proto_dist。 后面跟着一个或多个分布模块的名称,需要去掉后缀 "_dist",也就是说,分布模块 uds_dist 被指定为 -proto_dist uds

如果没有使用epmd(TCP端口映射器守护进程),还需要指定命令行选项-no_epmd,这使得Erlang跳过epmd的启动,无论是作为操作系统进程还是作为Erlang副本。

分布模块所在的目录路径必须在启动时已知。这可以通过在命令行上指定 -pa <path> 或构建包含用于您的分布协议的应用程序的启动脚本来实现。(在 uds_dist 协议中,只需将 uds_dist 应用程序添加到脚本中。)

如果指定了上述所有内容,并且命令行中存在 -sname <name> 标志,则分布会在启动时启动。

示例 1

$ erl -pa $ERL_TOP/lib/kernel/examples/uds_dist/ebin -proto_dist uds -no_epmd
Erlang (BEAM) emulator version 5.0

Eshell V5.0  (abort with ^G)
1> net_kernel:start([bing,shortnames]).
{ok,<0.30.0>}
(bing@hador)2>

示例 2

$ erl -pa $ERL_TOP/lib/kernel/examples/uds_dist/ebin -proto_dist uds \
      -no_epmd -sname bong
Erlang (BEAM) emulator version 5.0

Eshell V5.0  (abort with ^G)
(bong@hador)1>

可以使用 ERL_FLAGS 环境变量来存储复杂的参数。

$ ERL_FLAGS=-pa $ERL_TOP/lib/kernel/examples/uds_dist/ebin \
      -proto_dist uds -no_epmd
$ export ERL_FLAGS
$ erl -sname bang
Erlang (BEAM) emulator version 5.0

Eshell V5.0  (abort with ^G)
(bang@hador)1>

ERL_FLAGS 不应包含节点名称。