[openwrt] ubus实现进程间通信举例

摘自:https://blog.csdn.net/jasonchen_gbd/article/details/46055885

上一篇文章介绍了ubus的组件和实现原理,本文通过代码实例介绍使用ubus进行进程间通信的三种方式。

1. invoke的方式实现端对端通信

最简单的情景就是一个提供服务的server端,一个请求服务的client端,client请求server的服务。
下面的例子中,server注册了一个名为“scan_prog”的对象,该对象中提供一个“scan”方法:
ubus_invoke.h:
#ifndef __UBUS_INVOKE_H__
#define __UBUS_INVOKE_H__
#include <json/json.h>
#include <libubox/blobmsg_json.h>
 
 
struct prog_attr {
    char name[64];
    int chn_id;
};
#define PROG_MAX    8
 
 
#endif  /* __UBUS_INVOKE_H__ */

invoke_server.c:

  1 #include <libubox/uloop.h>
  2 #include <libubox/ustream.h>
  3 #include <libubox/utils.h>
  4 #include <libubus.h>
  5 #include <json/json.h>
  6 #include <libubox/blobmsg_json.h>
  7 #include "ubus_invoke.h"
  8  
  9 static struct ubus_context * ctx = NULL;
 10 static struct blob_buf b;
 11 static const char * sock_path;
 12  
 13 static struct prog_attr uri_list[PROG_MAX] = 
 14 {
 15     {"program_beijing", 1},
 16     {"program_guangzhou", 2},
 17     {"program_shanghai", 3},
 18     {"", -1},
 19 };
 20  
 21 enum
 22 {
 23     SCAN_CHNID,
 24     SCAN_POLICY_MAX,
 25 };
 26  
 27 static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = {
 28     [SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32},
 29 };
 30  
 31 static int ubus_start_scan(struct ubus_context *ctx, struct ubus_object *obj,
 32               struct ubus_request_data *req, const char *method,
 33               struct blob_attr *msg)
 34 {
 35     int ret = -1;
 36     void * json_list = NULL;
 37     void * json_uri = NULL;
 38     int idx;
 39  
 40     blob_buf_init(&b, 0);
 41     
 42     struct blob_attr *tb[SCAN_POLICY_MAX];
 43     blobmsg_parse(scan_policy, SCAN_POLICY_MAX, tb, blob_data(msg), blob_len(msg));
 44  
 45     /*
 46     本例子中,如果请求特定的节目号,返回节目名称。
 47     如果请求节目号是0,则返回所有节目列表。
 48     */
 49     if (tb[SCAN_CHNID] != NULL)
 50     {
 51         int chnid = blobmsg_get_u32(tb[SCAN_CHNID]);
 52  
 53         if (chnid == 0)
 54         {
 55             json_uri = blobmsg_open_array(&b, "prog_list");
 56             for (idx = 0; idx < PROG_MAX; idx++)
 57             {
 58                 if ('' != uri_list[idx].name[0])
 59                 {
 60                     json_list = blobmsg_open_table(&b, NULL);
 61                     blobmsg_add_string(&b, "name", uri_list[idx].name);
 62                     blobmsg_add_u32(&b, "channel", uri_list[idx].chn_id);
 63                     blobmsg_close_table(&b, json_list);
 64                 }
 65             }
 66             blobmsg_close_array(&b, json_uri);
 67             ret = 0;
 68         }
 69         else
 70         {
 71             for (idx = 0; idx < PROG_MAX; idx++)
 72             {
 73                 if ('' != uri_list[idx].name[0] && uri_list[idx].chn_id == chnid)
 74                 {
 75                     blobmsg_add_string(&b, "name", uri_list[idx].name);
 76                     ret = 0;
 77                 }
 78             }
 79         }
 80     }
 81     
 82     blobmsg_add_u32(&b, "result", ret);
 83     ubus_send_reply(ctx, req, b.head);
 84     
 85     return 0;
 86 }
 87  
 88 /* 方法列表 */
 89 static struct ubus_method scan_methods[] = 
 90 {
 91     UBUS_METHOD("scan", ubus_start_scan, scan_policy),
 92 };
 93  
 94 /* type目前没有实际用处 */
 95 static struct ubus_object_type scan_obj_type
 96 = UBUS_OBJECT_TYPE("scan_prog", scan_methods);
 97  
 98 static struct ubus_object scan_obj = 
 99 {
100     .name = "scan_prog", /* 对象的名字 */
101     .type = &scan_obj_type,
102     .methods = scan_methods,
103     .n_methods = ARRAY_SIZE(scan_methods),
104 };
105  
106 static void ubus_add_fd(void)
107 {
108     ubus_add_uloop(ctx);
109  
110 #ifdef FD_CLOEXEC
111     fcntl(ctx->sock.fd, F_SETFD,
112         fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);
113 #endif
114 }
115  
116 static void ubus_reconn_timer(struct uloop_timeout *timeout)
117 {
118     static struct uloop_timeout retry =
119     {
120         .cb = ubus_reconn_timer,
121     };
122     int t = 2;
123  
124     if (ubus_reconnect(ctx, sock_path) != 0) {
125         uloop_timeout_set(&retry, t * 1000);
126         return;
127     }
128  
129     ubus_add_fd();
130 }
131  
132 static void ubus_connection_lost(struct ubus_context *ctx)
133 {
134     ubus_reconn_timer(NULL);
135 }
136  
137 static int display_ubus_init(const char *path)
138 {
139     uloop_init();
140     sock_path = path;
141  
142     ctx = ubus_connect(path);
143     if (!ctx)
144     {
145         printf("ubus connect failed
");
146         return -1;
147     }
148     
149     printf("connected as %08x
", ctx->local_id);
150     ctx->connection_lost = ubus_connection_lost;
151  
152     ubus_add_fd();
153  
154     /* 向ubusd注册对象和方法,供其他ubus客户端调用。 */
155     if (ubus_add_object(ctx, &scan_obj) != 0)
156     {
157         printf("ubus add obj failed
");
158         return -1;
159     }
160  
161     return 0;
162 }
163  
164 static void display_ubus_done(void)
165 {
166     if (ctx)
167         ubus_free(ctx);
168 }
169  
170 int main(int argc, char * argv[])
171 {
172     char * path = NULL;
173     
174     if (-1 == display_ubus_init(path))
175     {
176         printf("ubus connect failed!
");
177         return -1;
178     }
179  
180     uloop_run();
181  
182     display_ubus_done();
183  
184     return 0;
185 }
客户端代码invoke_client.c去调用server端的"scan_prog"对象的"scan"方法:
#include <libubox/uloop.h>
#include <libubox/ustream.h>
#include <libubox/utils.h>
#include <libubus.h>
#include <json/json.h>
#include <libubox/blobmsg_json.h>
#include "ubus_invoke.h"
 
static struct ubus_context * ctx = NULL;
static struct blob_buf b;
static const char * cli_path;
 
enum
{
    SCAN_CHNID,
    SCAN_POLICY_MAX,
};
 
static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = {
    [SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32},
};
 
static int timeout = 30;
static bool simple_output = false;
 
static void scanreq_prog_cb(struct ubus_request *req, int type, struct blob_attr *msg)
{
    char *str;
    if (!msg)
        return;
 
    /* 
    在这里处理返回的消息。
    本例子只是将返回的消息打印出来。
    */
    str = blobmsg_format_json_indent(msg, true, simple_output ? -1 : 0);
    printf("%s
", str);
    free(str);
}
 
static int client_ubus_call()
{
    unsigned int id;
    int ret;
 
    blob_buf_init(&b, 0);
 
    /* 需要传递的参数 */
    blobmsg_add_u32(&b, scan_policy[SCAN_CHNID].name, 0);
 
    /*
    向ubusd查询是否存在"scan_prog"这个对象,
    如果存在,返回其id
    */
    ret = ubus_lookup_id(ctx, "scan_prog", &id);
    if (ret != UBUS_STATUS_OK) {
        printf("lookup scan_prog failed
");
        return ret;
    }
    else {
        printf("lookup scan_prog successs
");
    }
    
    /* 调用"scan_prog"对象的"scan"方法 */
    return ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000);
}
 
/*
ubus_invoke()的声明如下:
int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method,
                struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout);
                
其中cb参数是消息回调函数,其函数类型定义为:
typedef void (*ubus_data_handler_t)(struct ubus_request *req,
                    int type, struct blob_attr *msg);
参数type是请求消息的类型,如UBUS_MSG_INVOKE,UBUS_MSG_DATA等。
参数msg存放从服务端得到的回复消息,struct blob_attr类型,同样用blobmsg_parse()来解析。
参数req保存了请求方的消息属性,其中req->priv即ubus_invoke()中的priv参数,
用这个参数可以零活的传递一些额外的数据。
*/
 
static int client_ubus_init(const char *path)
{
    uloop_init();
    cli_path = path;
 
    ctx = ubus_connect(path);
    if (!ctx)
    {
        printf("ubus connect failed
");
        return -1;
    }
    
    printf("connected as %08x
", ctx->local_id);
 
    return 0;
}
 
static void client_ubus_done(void)
{
    if (ctx)
        ubus_free(ctx);
}
 
int main(int argc, char * argv[])
{
    /* ubusd创建的unix socket,默认值为"/var/run/ubus.sock" */
    char * path = NULL;
 
    /* 连接ubusd */
    if (UBUS_STATUS_OK != client_ubus_init(path))
    {
        printf("ubus connect failed!
");
        return -1;
    }
 
    /* 调用某个ubus方法并处理返回结果 */
    client_ubus_call();
 
    client_ubus_done();
 
    return 0;
}
先执行server程序,再执行client程序,可以看到client发出请求后,server返回了相应的节目号,在client打印出了接收到的消息。
也可以通过shell命令来请求server的服务,例如:
ubus call scan_prog scan '{"chnID": 0}'
这条命令和执行client程序的作用相同。

2. subscribe/notify的方式实现订阅

ubus支持以订阅的方式进行进程间通信,通信方式如下图:

被订阅者(server)又称为notifier,订阅者(client)又称为subscriber。这样一来,可以同时有多个client订阅server的某个服务,当server通过该服务广播消息时,所有client都会被通知,并执行各自的处理。

主要过程为:

server进程:

  1. 连接ubusd。
  2. 注册一个object,用于client订阅。
  3. server可以向订阅者广播消息。
  4. 等待消息。

client进程:

  1. 连接ubusd。
  2. 向server订阅object,并定义收到server的消息时的处理函数。
  3. 等待消息。

代码示例:下面这个例子中,server注册了一个名为“test”的对象,并定期广播消息。而client去订阅这个对象,并对server发出的消息做处理。

notify_server.c:

  

 1 #include <unistd.h>
 2 #include <libubox/blobmsg_json.h>
 3 #include <libubox/uloop.h>
 4 #include <libubus.h>
 5  
 6 static struct ubus_context *ctx;
 7  
 8 static void test_client_subscribe_cb(struct ubus_context *ctx, struct ubus_object *obj)
 9 {
10     fprintf(stderr, "Subscribers active: %d
", obj->has_subscribers);
11 }
12  
13 /* 这个空的method列表,只是为了让object有个名字,这样client可以通过object name来订阅。 */
14 static struct ubus_method test_methods[] = {};
15  
16 static struct ubus_object_type test_obj_type = 
17     UBUS_OBJECT_TYPE("test", test_methods);
18  
19 static struct ubus_object test_object = {
20     .name = "test", /* object的名字 */
21     .type = &test_obj_type,
22     .subscribe_cb = test_client_subscribe_cb,
23 };
24  
25 static void notifier_main(void)
26 {
27     int ret;
28  
29     /* 注册一个object,client可以订阅这个object */
30     ret = ubus_add_object(ctx, &test_object);
31     if (ret) {
32         fprintf(stderr, "Failed to add object: %s
", ubus_strerror(ret));
33         return;
34     }
35  
36     /* 在需要的时候,向所有客户端发送notify消息 */
37     
38     /* step1: 如果需要传递参数,则保存到struct blob_attr类型的结构体中。 */
39  
40     /* 
41     int ubus_notify(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, int timeout);
42     type是一个字符串,自定义的。msg是需要携带的参数。如果需要等待回复,timeout需设置为>=0。
43     */
44     while (1) {
45         sleep(2);
46         /* step2: 广播notification消息。 */
47         ubus_notify(ctx,  &test_object, "say Hi!", NULL, -1);
48     }
49 }
50  
51 int main(int argc, char **argv)
52 {
53     const char *ubus_socket = NULL;
54  
55     uloop_init();
56  
57     ctx = ubus_connect(ubus_socket);
58     if (!ctx) {
59         fprintf(stderr, "Failed to connect to ubus
");
60         return -1;
61     }
62  
63     ubus_add_uloop(ctx);
64  
65     notifier_main();
66     
67     uloop_run();
68  
69     ubus_free(ctx);
70     uloop_done();
71  
72     return 0;
73 }

notify_client.c:客户端订阅“test”对象,在收到3次消息后,随即取消对“test”对象的订阅。

 1 #include <unistd.h>
 2 #include <libubox/blobmsg_json.h>
 3 #include <libubox/uloop.h>
 4 #include <libubus.h>
 5  
 6 static struct ubus_context *ctx;
 7  
 8 static int counter = 0;
 9 static uint32_t obj_id;
10 static struct ubus_subscriber test_event;
11  
12 static int test_notify(struct ubus_context *ctx, struct ubus_object *obj,
13                   struct ubus_request_data *req,
14                   const char *method, struct blob_attr *msg)
15 {
16     printf("notify handler...
");
17     counter++;
18     if (counter > 3)
19         ubus_unsubscribe(ctx, &test_event, obj_id); /* 取消订阅 */
20     return 0;
21 }
22  
23 static void test_handle_remove(struct ubus_context *ctx,
24                       struct ubus_subscriber *obj, uint32_t id)
25 {
26     printf("remove handler...
");
27 }
28  
29 static void subscriber_main(void)
30 {
31     int ret;
32     
33     /* 通知到来时的处理函数。 */
34     test_event.cb = test_notify;
35     test_event.remove_cb = test_handle_remove; //server主动发起删除该client的订阅的cb函数(如server退出的时候)
36  
37     /* 注册test_event */
38     ret = ubus_register_subscriber(ctx, &test_event);
39     if (ret)
40         fprintf(stderr, "Failed to add watch handler: %s
", ubus_strerror(ret));
41     
42     /* 得到要订阅的object的id */
43     ret = ubus_lookup_id(ctx, "test", &obj_id);
44     if (ret)
45         fprintf(stderr, "Failed to lookup object: %s
", ubus_strerror(ret));
46  
47     /* 订阅object */
48     ret = ubus_subscribe(ctx, &test_event, obj_id);
49     if (ret)
50         fprintf(stderr, "Failed to subscribe: %s
", ubus_strerror(ret));
51 }
52  
53 int main(int argc, char **argv)
54 {
55     const char *ubus_socket = NULL;
56  
57     uloop_init();
58  
59     ctx = ubus_connect(ubus_socket);
60     if (!ctx) {
61         fprintf(stderr, "Failed to connect to ubus
");
62         return -1;
63     }
64  
65     ubus_add_uloop(ctx);
66  
67     subscriber_main();
68     
69     uloop_run();
70  
71     ubus_free(ctx);
72     uloop_done();
73  
74     return 0;
75 }

先运行server&,注册可订阅的对象“test”,并随即每2秒向外广播通知消息。这时还没有client订阅这个对象。
运行多个client程序,由于每个client都订阅了“test”对象,则所有client都会收到server发出的消息。当client取消订阅后,则不再收到server端的消息。

event的方式实现事件通知

event机制从一对一的进程之间通信来讲,和invoke机制类似。不过event机制中,发送方不需要知道谁要接收这个消息,实际上就是一个广播消息。这类似于U盘的热插拔:当插上或拔出U盘时,内核会广播一个NETLINK事件,之后内核继续做自己的事情,而不关心谁会去监听和处理这个事件。

下面的例子中,client端同时监听了“add_device”和“rm_device”两个事件,而server端会触发“add_device”事件并携带device的一些信息发送出去。

event_client.c:

  1 #include <libubox/uloop.h>
  2 #include <libubox/ustream.h>
  3 #include <libubox/utils.h>
  4 #include <libubus.h>
  5 #include <json/json.h>
  6 #include <libubox/blobmsg_json.h>
  7  
  8 static struct ubus_context * ctx = NULL;
  9 static const char * cli_path;
 10  
 11 #define    UBUS_EVENT_ADD_DEVICE    "add_device"
 12 #define    UBUS_EVENT_REMOVE_DEVICE    "rm_device"
 13  
 14 static void ubus_probe_device_event(struct ubus_context *ctx, struct ubus_event_handler *ev,
 15               const char *type, struct blob_attr *msg)
 16 {
 17     char *str;
 18  
 19     if (!msg)
 20         return;
 21  
 22     /* 
 23     在这里实现收到事件后的动作。
 24     event也可以传递消息,放在msg中。
 25     
 26     本例子只是将返回的消息打印出来。
 27     */
 28     str = blobmsg_format_json(msg, true);
 29     printf("{ "%s": %s }
", type, str);
 30     free(str);
 31 }
 32  
 33 static int client_ubus_register_events()
 34 {
 35     static struct ubus_event_handler listener;
 36     int ret = 0;
 37  
 38     /* 注册特定event的listener。多个event可以使用同一个listener */
 39     memset(&listener, 0, sizeof(listener));
 40     listener.cb = ubus_probe_device_event;
 41     
 42     ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_ADD_DEVICE);
 43     if (ret)
 44     {
 45         fprintf(stderr, "register event failed.
");
 46         return -1;
 47     }
 48  
 49     ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_REMOVE_DEVICE);
 50     if (ret)
 51     {
 52         ubus_unregister_event_handler(ctx, &listener);
 53         fprintf(stderr, "register event failed.
");
 54         return -1;
 55     }
 56  
 57     return 0;
 58 }
 59  
 60 static void ubus_add_fd(void)
 61 {
 62     ubus_add_uloop(ctx);
 63  
 64 #ifdef FD_CLOEXEC
 65     fcntl(ctx->sock.fd, F_SETFD,
 66         fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);
 67 #endif
 68 }
 69  
 70 static void ubus_reconn_timer(struct uloop_timeout *timeout)
 71 {
 72     static struct uloop_timeout retry =
 73     {
 74         .cb = ubus_reconn_timer,
 75     };
 76     int t = 2;
 77  
 78     if (ubus_reconnect(ctx, cli_path) != 0) {
 79         uloop_timeout_set(&retry, t * 1000);
 80         return;
 81     }
 82  
 83     ubus_add_fd();
 84 }
 85  
 86 static void ubus_connection_lost(struct ubus_context *ctx)
 87 {
 88     ubus_reconn_timer(NULL);
 89 }
 90  
 91 static int client_ubus_init(const char *path)
 92 {
 93     uloop_init();
 94     cli_path = path;
 95     
 96     ctx = ubus_connect(path);
 97     if (!ctx)
 98     {
 99         printf("ubus connect failed
");
100         return -1;
101     }
102     
103     printf("connected as %08x
", ctx->local_id);
104     ctx->connection_lost = ubus_connection_lost;
105  
106     ubus_add_fd();
107  
108     return 0;
109 }
110  
111 static void client_ubus_done(void)
112 {
113     if (ctx)
114         ubus_free(ctx);
115 }
116  
117 int main(int argc, char * argv[])
118 {
119     char * path = NULL;
120  
121     /* 连接ubusd */
122     if (UBUS_STATUS_OK != client_ubus_init(path))
123     {
124         printf("ubus connect failed!
");
125         return -1;
126     }
127  
128     /* 注册某个事件的处理函数 */
129     client_ubus_register_events();
130  
131     uloop_run();
132     
133     client_ubus_done();
134  
135     return 0;
136 }

event_server.c:

 1 #include <libubox/uloop.h>
 2 #include <libubox/ustream.h>
 3 #include <libubox/utils.h>
 4 #include <libubus.h>
 5 #include <json/json.h>
 6 #include <libubox/blobmsg_json.h>
 7  
 8 static struct ubus_context * ctx = NULL;
 9 static struct blob_buf b;
10 static const char * sock_path;
11  
12 static int server_ubus_send_event(void)
13 {
14     blob_buf_init(&b, 0);
15  
16     /* 需要传递的参数 */
17     blobmsg_add_u32(&b, "major", 3);
18     blobmsg_add_u32(&b, "minor", 56);
19     blobmsg_add_string(&b, "name", "mmc01");
20  
21     /* 广播名为"add_device"的事件 */
22     return ubus_send_event(ctx, "add_device", b.head);
23 }
24  
25 static int display_ubus_init(const char *path)
26 {
27     uloop_init();
28     sock_path = path;
29  
30     ctx = ubus_connect(path);
31     if (!ctx)
32     {
33         printf("ubus connect failed
");
34         return -1;
35     }
36     
37     printf("connected as %08x
", ctx->local_id);
38  
39     return 0;
40 }
41  
42 static void display_ubus_done(void)
43 {
44     if (ctx)
45         ubus_free(ctx);
46 }
47  
48 int main(int argc, char * argv[])
49 {
50     char * path = NULL;
51     
52     if (-1 == display_ubus_init(path))
53     {
54         printf("ubus connect failed!
");
55         return -1;
56     }
57  
58     server_ubus_send_event();
59  
60     display_ubus_done();
61  
62     return 0;
63 }

先运行client &注册事件。我们同时启动两个client程序。
再执行server主动触发"add_device"事件,则凡是注册了这个事件的client都会收到该事件并执行各自的处理。

root@NVR:~# ./server
connected as fdecbdc1
{ "add_device": { "major": 3, "minor": 56, "name": "mmc01" } }
{ "add_device": { "major": 3, "minor": 56, "name": "mmc01" } }

也可以通过命令行的ubus send命令触发事件:

root@NVR:~# ubus send "rm_device" '{ "major": 3, "minor": 56, "name": "mmc01" }'
{ "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } }
{ "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } }

在使用ubus时,可根据实际的场景来选择用哪种方式进行进程间通信。如之前所说,ubus是为发送消息而设计的,不合适传输大量数据

原文地址:https://www.cnblogs.com/LiuYanYGZ/p/14204802.html