关于消息传递的一些说明

2021年3月19日 · 作者:John Högberg

消息传递一直是 Erlang 的核心,虽然有相当完善的文档,但我们避免深入太多细节,以便在实现时有更大的自由度。不过,在博客文章中描述它没有任何问题,所以让我们仔细看看!

Erlang 进程通过互相发送信号(不要与 Unix 信号混淆)进行通信。信号有很多种,而消息只是最常见的一种。实际上,所有涉及多个进程的操作都在内部使用信号:例如,link/1 函数是通过让相关进程来回通信直到它们就链接达成一致来实现的。

这有助于我们避免大量锁,并且本身可以写一篇有趣的博客文章,但现在我们只需要记住两件事:所有信号(包括消息)都在后台持续接收和处理,并且它们具有定义的顺序

两个进程之间的信号保证按照它们发送的顺序到达。换句话说,如果进程 A 向进程 B 发送信号 1,然后发送信号 2,则保证信号 1 在信号 2 之前到达。

为什么这很重要?考虑请求-响应模式

%% Send a monitor signal to `Pid`, requesting a 'DOWN' message
%% when `Pid` dies.
Mref = monitor(process, Pid),
%% Send a message signal to `Pid` with our `Request`
Pid ! {self(), Mref, Request},
receive
    {Mref, Response} ->
        %% Send a demonitor signal to `Pid`, and remove the
        %% corresponding 'DOWN' message that might have
        %% arrived in the meantime.
        erlang:demonitor(Mref, [flush]),
        {ok, Response};
    {'DOWN', Mref, _, _, Reason} ->
        {error, Reason}
end

由于已死的进程无法发送消息,我们知道响应必须在任何最终的 'DOWN' 消息之前到达,但是如果没有保证的顺序,'DOWN' 消息可能会在响应之前到达,我们将无法知道是否会有响应,这将非常令人恼火。

拥有定义的顺序可以为我们节省很多麻烦,而且不会付出太多代价,但是保证仅限于此。如果多个进程向一个公共进程发送信号,则它们可以以任何顺序到达,即使您“知道”其中一个信号是先发送的。例如,以下事件序列是合法的并且完全有可能的

  1. AB 发送信号 1
  2. AC 发送信号 2
  3. C 响应信号 2,向 B 发送信号 3
  4. B 接收信号 3
  5. B 接收信号 1

幸运的是,全局顺序很少需要,并且很容易自己施加(在分布式案例之外):只需让所有相关方与一个公共进程同步即可。

发送消息 #

发送消息很简单:我们尝试查找与进程标识符关联的进程,如果存在,则将消息插入其信号队列。

消息总是在插入队列之前被复制。尽管这听起来很浪费,但它大大减少了垃圾收集 (GC) 的延迟,因为 GC 从来不需要查看单个进程之外的内容。过去曾尝试过非复制实现,但事实证明它们不合适,因为对于 Erlang 设计用于构建的软实时系统来说,低延迟比纯吞吐量更重要。

默认情况下,消息直接复制到接收进程的堆中,但是当不可能(或不希望 - 请参阅 message_queue_data 标志)时,我们会在堆外分配消息。

内存分配使得这种“堆外”消息稍微昂贵一些,但它们对于接收大量消息的进程非常有用。在复制消息时我们不需要与接收者交互 - 仅在将其添加到队列时 - 并且由于进程查看消息的唯一方法是通过在 receive 表达式中匹配它们,GC 不需要考虑不匹配的消息,这进一步减少了延迟。

将消息发送到其他 Erlang 节点上的进程的工作方式相同,尽管现在存在消息在传输过程中丢失的风险。只要节点之间的分布式链接处于活动状态,就保证消息的传递,但是当链接断开时,情况就会变得棘手。

在远程进程(或节点)上使用 monitor/2 将会告诉您何时发生这种情况,就像进程死了一样(原因是 noconnection),但这并不总是奏效:链接可能在另一端接收和处理消息之后断开,我们所知道的只是链接在任何最终响应到达之前断开了。

与所有其他事情一样,没有免费的午餐,您需要决定您的应用程序应如何处理这些情况

接收消息 #

人们可能会猜测进程通过 receive 表达式接收消息,但 receive 有点用词不当。与所有其他信号一样,进程在后台连续处理它们,将接收到的消息从信号队列移动到消息队列。

receive 在消息队列中搜索匹配的消息(按照它们到达的顺序),如果未找到消息,则等待新消息。搜索消息队列而不是信号队列意味着它不必担心发送消息的进程,这大大提高了性能。

这种“选择性接收”特定消息的能力非常方便:我们并不总是在可以决定如何处理消息的上下文中,并且不得不手动拖着所有未处理的消息肯定很烦人。

不幸的是,将搜索扫到地毯下并不会使其消失

receive
    {reply, Result} ->
        {ok, Result}
end

如果队列中的下一个消息与 {reply, Result} 匹配,则上面的表达式会立即完成,但是如果没有匹配的消息,它必须遍历所有消息才能放弃。当排队的消息很多时,这会很昂贵,这对于类似服务器的进程很常见,并且由于 receive 表达式可以匹配几乎任何内容,因此几乎无法优化搜索本身。

我们目前所做的唯一优化是在我们知道消息在某个点之前不可能存在时,标记搜索的起点。让我们重新审视请求-响应模式

Mref = monitor(process, Pid),
Pid ! {self(), Mref, Request},
receive
    {Mref, Response} ->
        erlang:demonitor(Mref, [flush]),
        {ok, Response};
    {'DOWN', Mref, _, _, Reason} ->
        {error, Reason}
end

由于 monitor/2 创建的引用是全局唯一的,并且在上述调用之前不可能存在,并且 receive 仅匹配包含所述引用的消息,因此我们不需要查看在那之前接收到的任何消息。

即使在具有超长消息队列的进程上,这也使这种模式非常有效,但不幸的是,这不是我们可以在一般情况下做的事情。尽管您作为程序员可以确定某个响应必须在其请求之后到达,即使没有引用,例如通过使用您自己的序列号,但编译器无法读取您的意图,并且必须假设您需要任何匹配的消息。

弄清楚上面的优化是否已启动目前非常令人恼火。它需要检查 BEAM 汇编,即使这样,由于一些令人恼火的限制,也不能保证它会起作用

  1. 我们一次只支持一个消息位置:一个创建引用、调用另一个使用此优化的函数,然后返回到具有第一个引用的 receive 的函数,最终将搜索整个消息队列。
  2. 它仅在单个函数子句中起作用:引用创建和 receive 都需要彼此相邻,并且您不能有多个函数调用公共的 receive 助手。

我们在即将发布的 OTP 24 版本中解决了这些缺点,并添加了一个编译器选项来帮助您发现它在哪里应用

$ erlc +recv_opt_info example.erl
-module(example).
-export([t/2]).

t(Pid, Request) ->
    %% example.erl:5: OPTIMIZED: reference used to mark a 
    %%                           message queue position
    Mref = monitor(process, Pid),
    Pid ! {self(), Mref, Request},
    %% example.erl:7: INFO: passing reference created by
    %%                      monitor/2 at example.erl:5
    await_result(Mref).

await_result(Mref) ->
    %% example.erl:10: OPTIMIZED: all clauses match reference
    %%                            in function parameter 1
    receive
        {Mref, Response} ->
            erlang:demonitor(Mref, [flush]),
            {ok, Response};
        {'DOWN', Mref, _, _, Reason} ->
            {error, Reason}
    end.