查看源代码 如何实现驱动程序
注意
本节是很久以前编写的。大部分内容仍然有效,因为它解释了重要的概念,但是这是为较旧的驱动程序接口编写的,因此示例不再起作用。建议读者同时阅读
erl_driver
和driver_entry
文档。
简介
本节介绍如何为 Erlang 构建自己的驱动程序。
Erlang 中的驱动程序是用 C 编写的库,它链接到 Erlang 仿真器并从 Erlang 中调用。当 C 比 Erlang 更合适时,可以使用驱动程序来加速操作,或者提供对 Erlang 无法直接访问的 OS 资源的访问。
驱动程序可以动态加载,作为共享库(在 Windows 上称为 DLL),也可以静态加载,在编译和链接时与仿真器链接。这里仅介绍动态加载的驱动程序,静态链接的驱动程序超出本节范围。
警告
加载驱动程序时,它在仿真器的上下文中执行,共享相同的内存和相同的线程。这意味着驱动程序中的所有操作都必须是非阻塞的,并且驱动程序中的任何崩溃都会使整个仿真器崩溃。简而言之,请小心。
示例驱动程序
本节介绍一个简单的驱动程序,用于使用 libpq C 客户端库访问 postgres 数据库。之所以使用 Postgres 是因为它免费且开源。有关 postgres 的信息,请参阅 www.postgres.org。
该驱动程序是同步的,它使用客户端库的同步调用。这只是为了简单起见,但并不好,因为它在等待数据库时会暂停仿真器。下面使用异步示例驱动程序对此进行了改进。
代码很简单:Erlang 和驱动程序之间的所有通信都使用 port_control/3
完成,驱动程序使用 rbuf
返回数据。
Erlang 驱动程序仅导出一个函数:驱动程序入口函数。这使用宏 DRIVER_INIT
定义,该宏返回指向 C struct
的指针,该结构包含从仿真器调用的入口点。 struct
定义了仿真器调用以调用驱动程序的入口,对于驱动程序未定义和使用的入口,则使用 NULL
指针。
当使用 open_port/2
将驱动程序作为端口打开时,将调用 start
入口。在这里,我们为用户数据结构分配内存。每次仿真器调用我们时,都会传递此用户数据。首先,我们存储驱动程序句柄,因为在以后的调用中需要它。我们为 LibPQ 使用的连接句柄分配内存。我们还通过调用 set_port_control_flags
设置标志 PORT_CONTROL_FLAG_BINARY
,将端口设置为返回分配的驱动程序二进制文件。(这是因为我们不知道我们的数据是否适合 control
的结果缓冲区,该缓冲区具有仿真器设置的默认大小 64 字节。)
加载驱动程序时,会调用入口 init
。但是,我们不使用它,因为它只执行一次,并且我们希望有多个驱动程序实例的可能性。
关闭端口时,会调用 stop
入口。
当 Erlang 代码调用 port_control/3
时,会从仿真器调用 control
入口,以执行实际工作。我们定义了一组简单的命令:connect
用于登录到数据库,disconnect
用于注销,select
用于发送 SQL 查询并获取结果。所有结果都通过 rbuf
返回。 ei
中的库 erl_interface
用于以二进制项格式编码数据。结果以二进制项的形式返回给仿真器,因此在 Erlang 中调用 binary_to_term
将结果转换为项形式。
代码可在 erts
的 sample
目录中的 pg_sync.c
中找到。
驱动程序入口包含仿真器将调用的函数。在此示例中,仅提供 start
、stop
和 control
/* Driver interface declarations */
static ErlDrvData start(ErlDrvPort port, char *command);
static void stop(ErlDrvData drv_data);
static int control(ErlDrvData drv_data, unsigned int command, char *buf,
int len, char **rbuf, int rlen);
static ErlDrvEntry pq_driver_entry = {
NULL, /* init */
start,
stop,
NULL, /* output */
NULL, /* ready_input */
NULL, /* ready_output */
"pg_sync", /* the name of the driver */
NULL, /* finish */
NULL, /* handle */
control,
NULL, /* timeout */
NULL, /* outputv */
NULL, /* ready_async */
NULL, /* flush */
NULL, /* call */
NULL /* event */
};
我们有一个结构来存储驱动程序所需的状态,在这种情况下,我们只需要保留数据库连接
typedef struct our_data_s {
PGconn* conn;
} our_data_t;
我们定义的控制代码如下
/* Keep the following definitions in alignment with the
* defines in erl_pq_sync.erl
*/
#define DRV_CONNECT 'C'
#define DRV_DISCONNECT 'D'
#define DRV_SELECT 'S'
这会返回驱动程序结构。宏 DRIVER_INIT
定义了唯一导出的函数。所有其他函数都是静态的,不会从库中导出。
/* INITIALIZATION AFTER LOADING */
/*
* This is the init function called after this driver has been loaded.
* It must *not* be declared static. Must return the address to
* the driver entry.
*/
DRIVER_INIT(pq_drv)
{
return &pq_driver_entry;
}
这里进行了一些初始化,从 open_port/2
调用 start
。数据将传递给 control
和 stop
。
/* DRIVER INTERFACE */
static ErlDrvData start(ErlDrvPort port, char *command)
{
our_data_t* data;
data = (our_data_t*)driver_alloc(sizeof(our_data_t));
data->conn = NULL;
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
return (ErlDrvData)data;
}
我们调用断开连接以从数据库中注销。(这应该从 Erlang 完成,但以防万一。)
static int do_disconnect(our_data_t* data, ei_x_buff* x);
static void stop(ErlDrvData drv_data)
{
our_data_t* data = (our_data_t*)drv_data;
do_disconnect(data, NULL);
driver_free(data);
}
我们仅使用二进制格式将数据返回到仿真器;输入数据是 connect
和 select
的字符串参数。返回的数据包含 Erlang 项。
函数 get_s
和 ei_x_to_new_binary
是用于缩短代码的实用工具。 get_s
复制字符串并将其以零结尾,因为 postgres 客户端库需要这样做。 ei_x_to_new_binary
接受一个 ei_x_buff
缓冲区,分配一个二进制文件,并将数据复制到那里。此二进制文件在 *rbuf
中返回。(请注意,此二进制文件由仿真器释放,而不是由我们释放。)
static char* get_s(const char* buf, int len);
static int do_connect(const char *s, our_data_t* data, ei_x_buff* x);
static int do_select(const char* s, our_data_t* data, ei_x_buff* x);
/* As we are operating in binary mode, the return value from control
* is irrelevant, as long as it is not negative.
*/
static int control(ErlDrvData drv_data, unsigned int command, char *buf,
int len, char **rbuf, int rlen)
{
int r;
ei_x_buff x;
our_data_t* data = (our_data_t*)drv_data;
char* s = get_s(buf, len);
ei_x_new_with_version(&x);
switch (command) {
case DRV_CONNECT: r = do_connect(s, data, &x); break;
case DRV_DISCONNECT: r = do_disconnect(data, &x); break;
case DRV_SELECT: r = do_select(s, data, &x); break;
default: r = -1; break;
}
*rbuf = (char*)ei_x_to_new_binary(&x);
ei_x_free(&x);
driver_free(s);
return r;
}
do_connect
是我们登录到数据库的地方。如果连接成功,我们将连接句柄存储在驱动程序数据中,并返回 'ok'
。否则,我们返回来自 postgres 的错误消息,并在驱动程序数据中存储 NULL
。
static int do_connect(const char *s, our_data_t* data, ei_x_buff* x)
{
PGconn* conn = PQconnectdb(s);
if (PQstatus(conn) != CONNECTION_OK) {
encode_error(x, conn);
PQfinish(conn);
conn = NULL;
} else {
encode_ok(x);
}
data->conn = conn;
return 0;
}
如果我们已连接(并且连接句柄不是 NULL
),我们将从数据库中注销。我们需要检查是否应该编码一个 'ok'
,因为我们可以从不向仿真器返回数据的函数 stop
来到这里
static int do_disconnect(our_data_t* data, ei_x_buff* x)
{
if (data->conn == NULL)
return 0;
PQfinish(data->conn);
data->conn = NULL;
if (x != NULL)
encode_ok(x);
return 0;
}
我们执行查询并对结果进行编码。编码在另一个 C 模块 pg_encode.c
中完成,该模块也作为示例代码提供。
static int do_select(const char* s, our_data_t* data, ei_x_buff* x)
{
PGresult* res = PQexec(data->conn, s);
encode_result(x, res, data->conn);
PQclear(res);
return 0;
}
这里我们检查来自 postgres 的结果。如果是数据,我们将它编码为包含列数据的列表的列表。来自 postgres 的所有内容都是 C 字符串,因此我们使用 ei_x_encode_string
将结果作为字符串发送到 Erlang。(列表的开头包含列名。)
void encode_result(ei_x_buff* x, PGresult* res, PGconn* conn)
{
int row, n_rows, col, n_cols;
switch (PQresultStatus(res)) {
case PGRES_TUPLES_OK:
n_rows = PQntuples(res);
n_cols = PQnfields(res);
ei_x_encode_tuple_header(x, 2);
encode_ok(x);
ei_x_encode_list_header(x, n_rows+1);
ei_x_encode_list_header(x, n_cols);
for (col = 0; col < n_cols; ++col) {
ei_x_encode_string(x, PQfname(res, col));
}
ei_x_encode_empty_list(x);
for (row = 0; row < n_rows; ++row) {
ei_x_encode_list_header(x, n_cols);
for (col = 0; col < n_cols; ++col) {
ei_x_encode_string(x, PQgetvalue(res, row, col));
}
ei_x_encode_empty_list(x);
}
ei_x_encode_empty_list(x);
break;
case PGRES_COMMAND_OK:
ei_x_encode_tuple_header(x, 2);
encode_ok(x);
ei_x_encode_string(x, PQcmdTuples(res));
break;
default:
encode_error(x, conn);
break;
}
}
编译和链接示例驱动程序
该驱动程序将被编译并链接到共享库(Windows 上的 DLL)。使用 gcc,这是通过链接标志 -shared
和 -fpic
完成的。由于我们使用了 ei
库,因此我们也应该将其包括在内。 ei
有几个版本,编译为调试或非调试以及多线程或单线程。在示例的 makefile 中, obj
目录用于 ei
库,这意味着我们使用非调试的单线程版本。
在 Erlang 中将驱动程序作为端口调用
在可以从 Erlang 调用驱动程序之前,必须加载并打开它。加载是使用 erl_ddll
模块完成的(加载动态驱动程序的 erl_ddll
驱动程序实际上本身就是一个驱动程序)。如果加载成功,则可以使用 open_port/2
打开端口。端口名称必须与共享库的名称和驱动程序入口结构中的名称匹配。
打开端口后,即可调用驱动程序。在 pg_sync
示例中,我们没有任何来自端口的数据,只有来自 port_control/3
的返回值。
以下代码是同步 postgres 驱动程序的 Erlang 部分,pg_sync.erl
-module(pg_sync).
-define(DRV_CONNECT, 1).
-define(DRV_DISCONNECT, 2).
-define(DRV_SELECT, 3).
-export([connect/1, disconnect/1, select/2]).
connect(ConnectStr) ->
case erl_ddll:load_driver(".", "pg_sync") of
ok -> ok;
{error, already_loaded} -> ok;
E -> exit({error, E})
end,
Port = open_port({spawn, ?MODULE}, []),
case binary_to_term(port_control(Port, ?DRV_CONNECT, ConnectStr)) of
ok -> {ok, Port};
Error -> Error
end.
disconnect(Port) ->
R = binary_to_term(port_control(Port, ?DRV_DISCONNECT, "")),
port_close(Port),
R.
select(Port, Query) ->
binary_to_term(port_control(Port, ?DRV_SELECT, Query)).
API 很简单
connect/1
加载驱动程序,打开它,并登录到数据库,如果成功则返回 Erlang 端口。select/2
将查询发送到驱动程序并返回结果。disconnect/1
关闭数据库连接和驱动程序。(但是,它不会卸载它。)
连接字符串是 postgres 的连接字符串。
使用 erl_ddll:load_driver/2
加载驱动程序。如果此操作成功,或者已经加载,则将其打开。这将调用驱动程序中的 start
函数。
我们使用 port_control/3
函数进行所有对驱动程序的调用。驱动程序的结果会立即返回,并通过调用 binary_to_term/1
转换为项。(我们相信从驱动程序返回的项是格式良好的,否则 binary_to_term/1
调用可以包含在 catch
中。)
示例异步驱动程序
有时数据库查询可能需要很长时间才能完成,在我们的 pg_sync
驱动程序中,当驱动程序正在执行其工作时,模拟器会暂停。 这通常是不可接受的,因为没有其他 Erlang 进程有机会执行任何操作。为了改进我们的 postgres 驱动程序,我们使用 LibPQ 中的异步调用重新实现了它。
该驱动程序的异步版本位于示例文件 pg_async.c
和 pg_asyng.erl
中。
/* Driver interface declarations */
static ErlDrvData start(ErlDrvPort port, char *command);
static void stop(ErlDrvData drv_data);
static int control(ErlDrvData drv_data, unsigned int command, char *buf,
int len, char **rbuf, int rlen);
static void ready_io(ErlDrvData drv_data, ErlDrvEvent event);
static ErlDrvEntry pq_driver_entry = {
NULL, /* init */
start,
stop,
NULL, /* output */
ready_io, /* ready_input */
ready_io, /* ready_output */
"pg_async", /* the name of the driver */
NULL, /* finish */
NULL, /* handle */
control,
NULL, /* timeout */
NULL, /* outputv */
NULL, /* ready_async */
NULL, /* flush */
NULL, /* call */
NULL /* event */
};
typedef struct our_data_t {
PGconn* conn;
ErlDrvPort port;
int socket;
int connecting;
} our_data_t;
与 pg_sync.c
相比,一些事情发生了变化:我们使用条目 ready_io
来处理 ready_input
和 ready_output
,只有当有数据需要从套接字读取时,才会从模拟器调用此条目。(实际上,该套接字在模拟器内部的 select
函数中使用,当套接字发出信号,表明有数据要读取时,会调用 ready_input
条目。有关此内容的更多信息,请参见下文。)
我们的驱动程序数据也进行了扩展,我们跟踪用于与 postgres 通信的套接字,以及当我们使用 driver_output
将数据发送到端口时所需的端口。我们有一个标志 connecting
来表示驱动程序是在等待连接还是等待查询结果。(这是必要的,因为当连接时和有查询结果时都会调用条目 ready_io
。)
static int do_connect(const char *s, our_data_t* data)
{
PGconn* conn = PQconnectStart(s);
if (PQstatus(conn) == CONNECTION_BAD) {
ei_x_buff x;
ei_x_new_with_version(&x);
encode_error(&x, conn);
PQfinish(conn);
conn = NULL;
driver_output(data->port, x.buff, x.index);
ei_x_free(&x);
}
PQconnectPoll(conn);
int socket = PQsocket(conn);
data->socket = socket;
driver_select(data->port, (ErlDrvEvent)socket, DO_READ, 1);
driver_select(data->port, (ErlDrvEvent)socket, DO_WRITE, 1);
data->conn = conn;
data->connecting = 1;
return 0;
}
connect
函数看起来也有所不同。我们使用异步的 PQconnectStart
函数进行连接。连接启动后,我们使用 PQsocket
获取连接的套接字。此套接字与 driver_select
函数一起使用,以等待连接。当套接字准备好进行输入或输出时,会调用 ready_io
函数。
请注意,我们仅在此处出现错误时才返回数据(使用 driver_output
),否则我们将等待连接完成,在这种情况下,我们的 ready_io
函数将被调用。
static int do_select(const char* s, our_data_t* data)
{
data->connecting = 0;
PGconn* conn = data->conn;
/* if there's an error return it now */
if (PQsendQuery(conn, s) == 0) {
ei_x_buff x;
ei_x_new_with_version(&x);
encode_error(&x, conn);
driver_output(data->port, x.buff, x.index);
ei_x_free(&x);
}
/* else wait for ready_output to get results */
return 0;
}
do_select
函数启动选择,如果没有立即发生错误则返回。结果在 ready_io
被调用时返回。
static void ready_io(ErlDrvData drv_data, ErlDrvEvent event)
{
PGresult* res = NULL;
our_data_t* data = (our_data_t*)drv_data;
PGconn* conn = data->conn;
ei_x_buff x;
ei_x_new_with_version(&x);
if (data->connecting) {
ConnStatusType status;
PQconnectPoll(conn);
status = PQstatus(conn);
if (status == CONNECTION_OK)
encode_ok(&x);
else if (status == CONNECTION_BAD)
encode_error(&x, conn);
} else {
PQconsumeInput(conn);
if (PQisBusy(conn))
return;
res = PQgetResult(conn);
encode_result(&x, res, conn);
PQclear(res);
for (;;) {
res = PQgetResult(conn);
if (res == NULL)
break;
PQclear(res);
}
}
if (x.index > 1) {
driver_output(data->port, x.buff, x.index);
if (data->connecting)
driver_select(data->port, (ErlDrvEvent)data->socket, DO_WRITE, 0);
}
ei_x_free(&x);
}
当从 postgres 获取的套接字准备好进行输入或输出时,会调用 ready_io
函数。在此处,我们首先检查是否正在连接到数据库。在这种情况下,我们检查连接状态,如果连接成功,则返回 OK,否则返回错误。如果尚未建立连接,我们只需返回;ready_io
将再次被调用。
如果我们有来自连接的结果,由 x
缓冲区中的数据指示,我们不再需要在输出 (ready_output
) 上进行选择,因此我们通过调用 driver_select
来删除此选项。
如果我们没有连接,我们将等待来自 PQsendQuery
的结果,因此我们获取结果并返回它。编码使用与前面示例中相同的函数完成。
此处将添加错误处理,例如,检查套接字是否仍然打开,但这只是一个简单的示例。
异步驱动程序的 Erlang 部分由示例文件 pg_async.erl
组成。
-module(pg_async).
-define(DRV_CONNECT, $C).
-define(DRV_DISCONNECT, $D).
-define(DRV_SELECT, $S).
-export([connect/1, disconnect/1, select/2]).
connect(ConnectStr) ->
case erl_ddll:load_driver(".", "pg_async") of
ok -> ok;
{error, already_loaded} -> ok;
_ -> exit({error, could_not_load_driver})
end,
Port = open_port({spawn, ?MODULE}, [binary]),
port_control(Port, ?DRV_CONNECT, ConnectStr),
case return_port_data(Port) of
ok ->
{ok, Port};
Error ->
Error
end.
disconnect(Port) ->
port_control(Port, ?DRV_DISCONNECT, ""),
R = return_port_data(Port),
port_close(Port),
R.
select(Port, Query) ->
port_control(Port, ?DRV_SELECT, Query),
return_port_data(Port).
return_port_data(Port) ->
receive
{Port, {data, Data}} ->
binary_to_term(Data)
end.
Erlang 代码略有不同,因为我们不会从 port_control/3
同步返回结果,而是从 driver_output
中获取结果,作为消息队列中的数据。上面的函数 return_port_data
接收来自端口的数据。由于数据是二进制格式,我们使用 binary_to_term/1
将其转换为 Erlang 项。请注意,驱动程序以二进制模式打开(open_port/2
调用时带有选项 [binary]
)。这意味着从驱动程序发送到模拟器的数据以二进制形式发送。如果没有选项 binary
,它们将是整数列表。
使用 driver_async 的异步驱动程序
作为最后一个示例,我们演示了如何使用 driver_async
。我们还使用驱动程序术语接口。驱动程序是用 C++ 编写的。这使我们能够使用 STL 中的算法。我们使用 next_permutation
算法来获取整数列表的下一个排列。对于较大的列表(> 100,000 个元素),这需要一些时间,因此我们将其作为异步任务执行。
驱动程序的异步 API 很复杂。首先,必须准备好工作。在示例中,这是在 output
中完成的。我们可以使用 control
,但我们希望示例中有所不同。在我们的驱动程序中,我们分配一个结构,其中包含异步任务执行工作所需的任何内容。这是在主模拟器线程中完成的。然后从驱动程序线程(与主模拟器线程分离)调用异步函数。请注意,驱动程序函数不是可重入的,因此不应使用它们。最后,在函数完成后,从主模拟器线程调用驱动程序回调 ready_async
,这是我们将结果返回给 Erlang 的地方。(我们不能从异步函数中返回结果,因为我们不能调用驱动程序函数。)
以下代码来自示例文件 next_perm.cc
。驱动程序条目看起来像以前一样,但也包含回调 ready_async
。
static ErlDrvEntry next_perm_driver_entry = {
NULL, /* init */
start,
NULL, /* stop */
output,
NULL, /* ready_input */
NULL, /* ready_output */
"next_perm", /* the name of the driver */
NULL, /* finish */
NULL, /* handle */
NULL, /* control */
NULL, /* timeout */
NULL, /* outputv */
ready_async,
NULL, /* flush */
NULL, /* call */
NULL /* event */
};
output
函数分配异步函数的工作区。由于我们使用 C++,我们使用一个结构,并将数据放入其中。我们必须复制原始数据,在从 output
函数返回后,该数据将不再有效,do_perm
函数将在稍后从另一个线程调用。我们在这里不返回数据,而是稍后从 ready_async
回调中发送数据。
async_data
传递给 do_perm
函数。我们不使用 async_free
函数(driver_async
的最后一个参数),它仅在以编程方式取消任务时使用。
struct our_async_data {
bool prev;
vector<int> data;
our_async_data(ErlDrvPort p, int command, const char* buf, int len);
};
our_async_data::our_async_data(ErlDrvPort p, int command,
const char* buf, int len)
: prev(command == 2),
data((int*)buf, (int*)buf + len / sizeof(int))
{
}
static void do_perm(void* async_data);
static void output(ErlDrvData drv_data, char *buf, int len)
{
if (*buf < 1 || *buf > 2) return;
ErlDrvPort port = reinterpret_cast<ErlDrvPort>(drv_data);
void* async_data = new our_async_data(port, *buf, buf+1, len);
driver_async(port, NULL, do_perm, async_data, do_free);
}
在 do_perm
中,我们执行工作,操作在 output
中分配的结构。
static void do_perm(void* async_data)
{
our_async_data* d = reinterpret_cast<our_async_data*>(async_data);
if (d->prev)
prev_permutation(d->data.begin(), d->data.end());
else
next_permutation(d->data.begin(), d->data.end());
}
在 ready_async
函数中,输出将发送回模拟器。我们使用驱动程序术语格式而不是 ei
。这是直接将 Erlang 项发送到驱动程序的唯一方法,而无需 Erlang 代码调用 binary_to_term/1
。在简单的示例中,这效果很好,我们无需使用 ei
来处理二进制术语格式。
当数据返回时,我们将取消分配数据。
static void ready_async(ErlDrvData drv_data, ErlDrvThreadData async_data)
{
ErlDrvPort port = reinterpret_cast<ErlDrvPort>(drv_data);
our_async_data* d = reinterpret_cast<our_async_data*>(async_data);
int n = d->data.size(), result_n = n*2 + 3;
ErlDrvTermData *result = new ErlDrvTermData[result_n], *rp = result;
for (vector<int>::iterator i = d->data.begin();
i != d->data.end(); ++i) {
*rp++ = ERL_DRV_INT;
*rp++ = *i;
}
*rp++ = ERL_DRV_NIL;
*rp++ = ERL_DRV_LIST;
*rp++ = n+1;
driver_output_term(port, result, result_n);
delete[] result;
delete d;
}
此驱动程序像其他驱动程序一样从 Erlang 调用。但是,由于我们使用 driver_output_term
,因此无需调用 binary_to_term/1
。Erlang 代码位于示例文件 next_perm.erl
中。
输入将更改为整数列表并发送到驱动程序。
-module(next_perm).
-export([next_perm/1, prev_perm/1, load/0, all_perm/1]).
load() ->
case whereis(next_perm) of
undefined ->
case erl_ddll:load_driver(".", "next_perm") of
ok -> ok;
{error, already_loaded} -> ok;
E -> exit(E)
end,
Port = open_port({spawn, "next_perm"}, []),
register(next_perm, Port);
_ ->
ok
end.
list_to_integer_binaries(L) ->
[<<I:32/integer-native>> || I <- L].
next_perm(L) ->
next_perm(L, 1).
prev_perm(L) ->
next_perm(L, 2).
next_perm(L, Nxt) ->
load(),
B = list_to_integer_binaries(L),
port_control(next_perm, Nxt, B),
receive
Result ->
Result
end.
all_perm(L) ->
New = prev_perm(L),
all_perm(New, L, [New]).
all_perm(L, L, Acc) ->
Acc;
all_perm(L, Orig, Acc) ->
New = prev_perm(L),
all_perm(New, Orig, [New | Acc]).