watch add/remove -> subscribe/unsubscribe:
authorFelix Fietkau <nbd@openwrt.org>
Thu, 13 Dec 2012 17:44:15 +0000 (18:44 +0100)
committerFelix Fietkau <nbd@openwrt.org>
Thu, 13 Dec 2012 18:35:38 +0000 (19:35 +0100)
rename the ADD_WATCH/REMOVE_WATCH messages to SUBSCRIBE/UNSUBSCRIBE and change
the message format and libubus API in preparation for adding object notifications

Signed-off-by: Felix Fietkau <nbd@openwrt.org>
CMakeLists.txt
examples/server.c
libubus-internal.h
libubus-sub.c [new file with mode: 0644]
libubus.c
libubus.h
ubusd_obj.c
ubusd_obj.h
ubusd_proto.c
ubusmsg.h

index a2c4101dd6c0f3fa93220869c517769783cbb639..b32cbb9fc38cac31adfd08db7a47025147e1848c 100644 (file)
@@ -12,7 +12,7 @@ IF(APPLE)
   LINK_DIRECTORIES(/opt/local/lib)
 ENDIF()
 
-ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c)
+ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c libubus-sub.c)
 TARGET_LINK_LIBRARIES(ubus ubox)
 
 ADD_EXECUTABLE(ubusd ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c)
index 97789d5b6b203321248cf7e859e1fc04376466e0..529059cb9272af104debde3ad8ad45fe3fc380f0 100644 (file)
@@ -16,7 +16,7 @@
 #include "libubus.h"
 
 static struct ubus_context *ctx;
-static struct ubus_watch_object test_event;
+static struct ubus_subscriber test_event;
 static struct blob_buf b;
 
 enum {
@@ -79,8 +79,9 @@ static const struct blobmsg_policy watch_policy[__WATCH_MAX] = {
        [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
 };
 
-static void test_handle_event(struct ubus_context *ctx, struct ubus_watch_object *w,
-                       uint32_t id)
+static void
+test_handle_remove(struct ubus_context *ctx, struct ubus_subscriber *s,
+                   uint32_t id)
 {
        fprintf(stderr, "Object %08x went away\n", id);
 }
@@ -96,8 +97,8 @@ static int test_watch(struct ubus_context *ctx, struct ubus_object *obj,
        if (!tb[WATCH_ID])
                return UBUS_STATUS_INVALID_ARGUMENT;
 
-       test_event.cb = test_handle_event;
-       ret = ubus_watch_object_add(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID]));
+       test_event.remove_cb = test_handle_remove;
+       ret = ubus_subscribe(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID]));
        fprintf(stderr, "Watching object %08x: %s\n", blobmsg_get_u32(tb[WATCH_ID]), ubus_strerror(ret));
        return ret;
 }
@@ -125,7 +126,7 @@ static void server_main(void)
        if (ret)
                fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret));
 
-       ret = ubus_register_watch_object(ctx, &test_event);
+       ret = ubus_register_subscriber(ctx, &test_event);
        if (ret)
                fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret));
 
index b58fb1d4c2f5730463eac69895aead623c046200..f3e2a7393558fdc0002bd2930c7088f601ea2f23 100644 (file)
@@ -26,5 +26,6 @@ void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr);
 void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr);
 int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req,
                                struct blob_attr *msg, int cmd, uint32_t peer);
+void ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr);
 
 #endif
diff --git a/libubus-sub.c b/libubus-sub.c
new file mode 100644 (file)
index 0000000..2bfb483
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2011-2012 Felix Fietkau <nbd@openwrt.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License version 2.1
+ * as published by the Free Software Foundation
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include "libubus.h"
+#include "libubus-internal.h"
+
+static int ubus_subscriber_cb(struct ubus_context *ctx, struct ubus_object *obj,
+                        struct ubus_request_data *req,
+                        const char *method, struct blob_attr *msg)
+{
+       struct ubus_subscriber *s;
+
+       s = container_of(obj, struct ubus_subscriber, obj);
+       s->cb(ctx, obj, req, method, msg);
+       return 0;
+}
+
+static const struct ubus_method watch_method = {
+       .name = NULL,
+       .handler = ubus_subscriber_cb,
+};
+
+int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *s)
+{
+       struct ubus_object *obj = &s->obj;
+
+       obj->methods = &watch_method;
+       obj->n_methods = 1;
+
+       return ubus_add_object(ctx, obj);
+}
+
+static int
+__ubus_subscribe_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type)
+{
+       struct ubus_request req;
+
+       blob_buf_init(&b, 0);
+       blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
+       blob_put_int32(&b, UBUS_ATTR_TARGET, id);
+       if (method)
+               blob_put_string(&b, UBUS_ATTR_METHOD, method);
+
+       if (ubus_start_request(ctx, &req, b.head, type, 0) < 0)
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
+       return ubus_complete_request(ctx, &req, 0);
+
+}
+
+int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id)
+{
+       return __ubus_subscribe_request(ctx, &obj->obj, id, "event", UBUS_MSG_SUBSCRIBE);
+}
+
+int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id)
+{
+       return __ubus_subscribe_request(ctx, &obj->obj, id, NULL, UBUS_MSG_UNSUBSCRIBE);
+}
+
+void __hidden ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr)
+{
+       struct ubus_subscriber *s;
+       struct blob_attr **attrbuf;
+       struct ubus_object *obj;
+       uint32_t objid;
+
+       attrbuf = ubus_parse_msg(hdr->data);
+       if (!attrbuf[UBUS_ATTR_OBJID] || !attrbuf[UBUS_ATTR_TARGET])
+               return;
+
+       objid = blob_get_u32(attrbuf[UBUS_ATTR_OBJID]);
+       obj = avl_find_element(&ctx->objects, &objid, obj, avl);
+       if (!obj)
+               return;
+
+       if (obj->methods != &watch_method)
+               return;
+
+       s = container_of(obj, struct ubus_subscriber, obj);
+       s->remove_cb(ctx, s, blob_get_u32(attrbuf[UBUS_ATTR_TARGET]));
+}
+
+
index bcaf63dc377f4a158a1cda98b70997f69e077ff4..03899d6db44e766513053e0eb65a5acee27cc6ee 100644 (file)
--- a/libubus.c
+++ b/libubus.c
@@ -228,6 +228,10 @@ void __hidden ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr
                        ubus_process_invoke(ctx, hdr);
                }
                break;
+
+       case UBUS_MSG_UNSUBSCRIBE:
+               ubus_process_unsubscribe(ctx, hdr);
+               break;
        }
 }
 
@@ -514,81 +518,6 @@ int ubus_register_event_handler(struct ubus_context *ctx,
                          NULL, NULL, 0);
 }
 
-enum {
-       WATCH_ID,
-       WATCH_NOTIFY,
-       __WATCH_MAX
-};
-
-static const struct blobmsg_policy watch_policy[] = {
-       [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
-       [WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING },
-};
-
-
-static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj,
-                        struct ubus_request_data *req,
-                        const char *method, struct blob_attr *msg)
-{
-       struct ubus_watch_object *w;
-       struct blob_attr *tb[__WATCH_MAX];
-
-       blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg));
-
-       if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY])
-               return UBUS_STATUS_INVALID_ARGUMENT;
-
-       if (req->peer)
-               return UBUS_STATUS_INVALID_ARGUMENT;
-
-       w = container_of(obj, struct ubus_watch_object, obj);
-       w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID]));
-       return 0;
-}
-
-static const struct ubus_method watch_method = {
-       .name = NULL,
-       .handler = ubus_watch_cb,
-};
-
-int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj)
-{
-       struct ubus_object *obj = &w_obj->obj;
-
-       obj->methods = &watch_method;
-       obj->n_methods = 1;
-
-       return ubus_add_object(ctx, obj);
-}
-
-static int
-__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type)
-{
-       struct ubus_request req;
-
-       blob_buf_init(&b, 0);
-       blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
-       blob_put_int32(&b, UBUS_ATTR_TARGET, id);
-       if (method)
-               blob_put_string(&b, UBUS_ATTR_METHOD, method);
-
-       if (ubus_start_request(ctx, &req, b.head, type, 0) < 0)
-               return UBUS_STATUS_INVALID_ARGUMENT;
-
-       return ubus_complete_request(ctx, &req, 0);
-
-}
-
-int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id)
-{
-       return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH);
-}
-
-int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id)
-{
-       return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH);
-}
-
 int ubus_send_event(struct ubus_context *ctx, const char *id,
                    struct blob_attr *data)
 {
index 59d981e7141ec6b80ecb63bce52a7729833d15ea..17c49527095dc09084daf6b2e458a496e22f87b7 100644 (file)
--- a/libubus.h
+++ b/libubus.h
@@ -29,7 +29,7 @@ struct ubus_request;
 struct ubus_request_data;
 struct ubus_object_data;
 struct ubus_event_handler;
-struct ubus_watch_object;
+struct ubus_subscriber;
 
 typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx,
                                      struct ubus_object_data *obj,
@@ -37,10 +37,10 @@ typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx,
 typedef int (*ubus_handler_t)(struct ubus_context *ctx, struct ubus_object *obj,
                              struct ubus_request_data *req,
                              const char *method, struct blob_attr *msg);
+typedef void (*ubus_remove_handler_t)(struct ubus_context *ctx,
+                                     struct ubus_subscriber *obj, uint32_t id);
 typedef void (*ubus_event_handler_t)(struct ubus_context *ctx, struct ubus_event_handler *ev,
                                     const char *type, struct blob_attr *msg);
-typedef void (*ubus_watch_handler_t)(struct ubus_context *ctx, struct ubus_watch_object *w,
-                                    uint32_t id);
 typedef void (*ubus_data_handler_t)(struct ubus_request *req,
                                    int type, struct blob_attr *msg);
 typedef void (*ubus_complete_handler_t)(struct ubus_request *req, int ret);
@@ -90,10 +90,11 @@ struct ubus_object {
        int n_methods;
 };
 
-struct ubus_watch_object {
+struct ubus_subscriber {
        struct ubus_object obj;
 
-       ubus_watch_handler_t cb;
+       ubus_handler_t cb;
+       ubus_remove_handler_t remove_cb;
 };
 
 struct ubus_event_handler {
@@ -199,12 +200,10 @@ int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj);
 /* remove the object from the ubus connection */
 int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj);
 
-/* add an object for watching other object state changes */
-int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *obj);
-
-int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id);
-
-int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id);
+/* add a subscriber notifications from another object */
+int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *obj);
+int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id);
+int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id);
 
 /* ----------- rpc ----------- */
 
index 7ae9b5f63174ba6cadb5e5d9c1920822a00cccfd..8b1b18f8fe4e14de302882900d22b3d088204d15 100644 (file)
@@ -112,8 +112,8 @@ struct ubus_object *ubusd_create_object_internal(struct ubus_object_type *type,
        obj->type = type;
        INIT_LIST_HEAD(&obj->list);
        INIT_LIST_HEAD(&obj->events);
-       INIT_LIST_HEAD(&obj->watchers);
-       INIT_LIST_HEAD(&obj->watched);
+       INIT_LIST_HEAD(&obj->subscribers);
+       INIT_LIST_HEAD(&obj->target_list);
        if (type)
                type->refcount++;
 
@@ -164,37 +164,37 @@ free:
        return NULL;
 }
 
-void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method)
+void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method)
 {
-       struct ubus_watch *w;
+       struct ubus_subscription *s;
 
-       w = calloc(1, sizeof(*w) + strlen(method) + 1);
-       if (!w)
+       s = calloc(1, sizeof(*s) + strlen(method) + 1);
+       if (!s)
                return;
 
-       w->watcher = obj;
-       w->watched = target;
-       list_add(&w->watcher_list, &target->watchers);
-       list_add(&w->watched_list, &obj->watched);
-       strcpy(w->method, method);
+       s->subscriber = obj;
+       s->target = target;
+       list_add(&s->list, &target->subscribers);
+       list_add(&s->target_list, &obj->target_list);
+       strcpy(s->method, method);
 }
 
-void ubus_watch_free(struct ubus_watch *w)
+void ubus_unsubscribe(struct ubus_subscription *s)
 {
-       list_del(&w->watcher_list);
-       list_del(&w->watched_list);
-       free(w);
+       list_del(&s->list);
+       list_del(&s->target_list);
+       free(s);
 }
 
 void ubusd_free_object(struct ubus_object *obj)
 {
-       struct ubus_watch *w, *tmp;
+       struct ubus_subscription *s, *tmp;
 
-       list_for_each_entry_safe(w, tmp, &obj->watched, watched_list) {
-               ubus_watch_free(w);
+       list_for_each_entry_safe(s, tmp, &obj->target_list, target_list) {
+               ubus_unsubscribe(s);
        }
-       list_for_each_entry_safe(w, tmp, &obj->watchers, watcher_list) {
-               ubus_proto_notify_watch(w);
+       list_for_each_entry_safe(s, tmp, &obj->subscribers, list) {
+               ubus_notify_unsubscribe(s);
        }
 
        ubusd_event_cleanup_object(obj);
index d0573f057cd0ab6227d2e7a623d683c94284085a..5a82e8dfccefebfb980a395c75272ff8aae7c685 100644 (file)
@@ -35,9 +35,9 @@ struct ubus_method {
        struct blob_attr data[];
 };
 
-struct ubus_watch {
-       struct list_head watcher_list, watched_list;
-       struct ubus_object *watcher, *watched;
+struct ubus_subscription {
+       struct list_head list, target_list;
+       struct ubus_object *subscriber, *target;
        char method[];
 };
 
@@ -47,7 +47,7 @@ struct ubus_object {
 
        struct list_head events;
 
-       struct list_head watchers, watched;
+       struct list_head subscribers, target_list;
 
        struct ubus_object_type *type;
        struct avl_node path;
@@ -76,8 +76,8 @@ static inline struct ubus_object *ubusd_find_object(uint32_t objid)
        return obj;
 }
 
-void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method);
-void ubus_watch_free(struct ubus_watch *w);
-void ubus_proto_notify_watch(struct ubus_watch *w);
+void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method);
+void ubus_unsubscribe(struct ubus_subscription *s);
+void ubus_notify_unsubscribe(struct ubus_subscription *s);
 
 #endif
index f461e47f781ec43b555adbe6fa8f4f41382c916f..59920ed7f9a8c79355244a26b0bd92a5e9d4ed50 100644 (file)
@@ -308,14 +308,14 @@ static int ubusd_handle_add_watch(struct ubus_client *cl, struct ubus_msg_buf *u
        if (cl == target->client)
                return UBUS_STATUS_INVALID_ARGUMENT;
 
-       ubus_watch_new(obj, target, blob_data(attr[UBUS_ATTR_METHOD]));
+       ubus_subscribe(obj, target, blob_data(attr[UBUS_ATTR_METHOD]));
        return 0;
 }
 
 static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
 {
        struct ubus_object *obj;
-       struct ubus_watch *w;
+       struct ubus_subscription *s;
        uint32_t id;
 
        if (!attr[UBUS_ATTR_OBJID] || !attr[UBUS_ATTR_TARGET])
@@ -329,11 +329,11 @@ static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf
                return UBUS_STATUS_INVALID_ARGUMENT;
 
        id = blob_get_u32(attr[UBUS_ATTR_TARGET]);
-       list_for_each_entry(w, &obj->watched, watched_list) {
-               if (w->watched->id.id != id)
+       list_for_each_entry(s, &obj->target_list, target_list) {
+               if (s->target->id.id != id)
                        continue;
 
-               ubus_watch_free(w);
+               ubus_unsubscribe(s);
                return 0;
        }
 
@@ -348,8 +348,8 @@ static const ubus_cmd_cb handlers[__UBUS_MSG_LAST] = {
        [UBUS_MSG_INVOKE] = ubusd_handle_invoke,
        [UBUS_MSG_STATUS] = ubusd_handle_response,
        [UBUS_MSG_DATA] = ubusd_handle_response,
-       [UBUS_MSG_ADD_WATCH] = ubusd_handle_add_watch,
-       [UBUS_MSG_REMOVE_WATCH] = ubusd_handle_remove_watch,
+       [UBUS_MSG_SUBSCRIBE] = ubusd_handle_add_watch,
+       [UBUS_MSG_UNSUBSCRIBE] = ubusd_handle_remove_watch,
 };
 
 void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub)
@@ -416,26 +416,19 @@ void ubusd_proto_free_client(struct ubus_client *cl)
        ubus_free_id(&clients, &cl->id);
 }
 
-void ubus_proto_notify_watch(struct ubus_watch *w)
+void ubus_notify_unsubscribe(struct ubus_subscription *s)
 {
        struct ubus_msg_buf *ub;
-       void *data;
 
        blob_buf_init(&b, 0);
-       blob_put_int32(&b, UBUS_ATTR_OBJID, w->watcher->id.id);
-       blob_put_string(&b, UBUS_ATTR_METHOD, w->method);
-
-       data = blob_nest_start(&b, UBUS_ATTR_DATA);
-       blobmsg_add_string(&b, "notify", "remove");
-       blobmsg_add_u32(&b, "id", w->watched->id.id);
-       blobmsg_add_u32(&b, "peer", w->watched->client->id.id);
-       blob_nest_end(&b, data);
+       blob_put_int32(&b, UBUS_ATTR_OBJID, s->subscriber->id.id);
+       blob_put_int32(&b, UBUS_ATTR_TARGET, s->target->id.id);
 
        ub = ubus_msg_from_blob(false);
-       ubus_msg_init(ub, UBUS_MSG_INVOKE, ++w->watcher->invoke_seq, 0);
-       ubus_msg_send(w->watcher->client, ub, true);
+       ubus_msg_init(ub, UBUS_MSG_UNSUBSCRIBE, ++s->subscriber->invoke_seq, 0);
+       ubus_msg_send(s->subscriber->client, ub, true);
 
-       ubus_watch_free(w);
+       ubus_unsubscribe(s);
 }
 
 static void __init ubusd_proto_init(void)
index 10f84db2045f99f2532beea2e7ab65c5426a9dcf..833d7bff17afbe9fda3f9dc2bb7e889764a0f76f 100644 (file)
--- a/ubusmsg.h
+++ b/ubusmsg.h
@@ -54,9 +54,13 @@ enum ubus_msg_type {
        UBUS_MSG_ADD_OBJECT,
        UBUS_MSG_REMOVE_OBJECT,
 
-       /* watch an object, notify on remove */
-       UBUS_MSG_ADD_WATCH,
-       UBUS_MSG_REMOVE_WATCH,
+       /*
+        * subscribe/unsubscribe to object notifications
+        * The unsubscribe message is sent from ubusd when
+        * the object disappears
+        */
+       UBUS_MSG_SUBSCRIBE,
+       UBUS_MSG_UNSUBSCRIBE,
 
        /* must be last */
        __UBUS_MSG_LAST,