LINK_DIRECTORIES(/opt/local/lib)
ENDIF()
-ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c)
+ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c libubus-sub.c)
TARGET_LINK_LIBRARIES(ubus ubox)
ADD_EXECUTABLE(ubusd ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c)
#include "libubus.h"
static struct ubus_context *ctx;
-static struct ubus_watch_object test_event;
+static struct ubus_subscriber test_event;
static struct blob_buf b;
enum {
[WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
};
-static void test_handle_event(struct ubus_context *ctx, struct ubus_watch_object *w,
- uint32_t id)
+static void
+test_handle_remove(struct ubus_context *ctx, struct ubus_subscriber *s,
+ uint32_t id)
{
fprintf(stderr, "Object %08x went away\n", id);
}
if (!tb[WATCH_ID])
return UBUS_STATUS_INVALID_ARGUMENT;
- test_event.cb = test_handle_event;
- ret = ubus_watch_object_add(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID]));
+ test_event.remove_cb = test_handle_remove;
+ ret = ubus_subscribe(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID]));
fprintf(stderr, "Watching object %08x: %s\n", blobmsg_get_u32(tb[WATCH_ID]), ubus_strerror(ret));
return ret;
}
if (ret)
fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret));
- ret = ubus_register_watch_object(ctx, &test_event);
+ ret = ubus_register_subscriber(ctx, &test_event);
if (ret)
fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret));
void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr);
int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req,
struct blob_attr *msg, int cmd, uint32_t peer);
+void ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr);
#endif
--- /dev/null
+/*
+ * Copyright (C) 2011-2012 Felix Fietkau <nbd@openwrt.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License version 2.1
+ * as published by the Free Software Foundation
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "libubus.h"
+#include "libubus-internal.h"
+
+static int ubus_subscriber_cb(struct ubus_context *ctx, struct ubus_object *obj,
+ struct ubus_request_data *req,
+ const char *method, struct blob_attr *msg)
+{
+ struct ubus_subscriber *s;
+
+ s = container_of(obj, struct ubus_subscriber, obj);
+ s->cb(ctx, obj, req, method, msg);
+ return 0;
+}
+
+static const struct ubus_method watch_method = {
+ .name = NULL,
+ .handler = ubus_subscriber_cb,
+};
+
+int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *s)
+{
+ struct ubus_object *obj = &s->obj;
+
+ obj->methods = &watch_method;
+ obj->n_methods = 1;
+
+ return ubus_add_object(ctx, obj);
+}
+
+static int
+__ubus_subscribe_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type)
+{
+ struct ubus_request req;
+
+ blob_buf_init(&b, 0);
+ blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
+ blob_put_int32(&b, UBUS_ATTR_TARGET, id);
+ if (method)
+ blob_put_string(&b, UBUS_ATTR_METHOD, method);
+
+ if (ubus_start_request(ctx, &req, b.head, type, 0) < 0)
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
+ return ubus_complete_request(ctx, &req, 0);
+
+}
+
+int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id)
+{
+ return __ubus_subscribe_request(ctx, &obj->obj, id, "event", UBUS_MSG_SUBSCRIBE);
+}
+
+int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id)
+{
+ return __ubus_subscribe_request(ctx, &obj->obj, id, NULL, UBUS_MSG_UNSUBSCRIBE);
+}
+
+void __hidden ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr)
+{
+ struct ubus_subscriber *s;
+ struct blob_attr **attrbuf;
+ struct ubus_object *obj;
+ uint32_t objid;
+
+ attrbuf = ubus_parse_msg(hdr->data);
+ if (!attrbuf[UBUS_ATTR_OBJID] || !attrbuf[UBUS_ATTR_TARGET])
+ return;
+
+ objid = blob_get_u32(attrbuf[UBUS_ATTR_OBJID]);
+ obj = avl_find_element(&ctx->objects, &objid, obj, avl);
+ if (!obj)
+ return;
+
+ if (obj->methods != &watch_method)
+ return;
+
+ s = container_of(obj, struct ubus_subscriber, obj);
+ s->remove_cb(ctx, s, blob_get_u32(attrbuf[UBUS_ATTR_TARGET]));
+}
+
+
ubus_process_invoke(ctx, hdr);
}
break;
+
+ case UBUS_MSG_UNSUBSCRIBE:
+ ubus_process_unsubscribe(ctx, hdr);
+ break;
}
}
NULL, NULL, 0);
}
-enum {
- WATCH_ID,
- WATCH_NOTIFY,
- __WATCH_MAX
-};
-
-static const struct blobmsg_policy watch_policy[] = {
- [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
- [WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING },
-};
-
-
-static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj,
- struct ubus_request_data *req,
- const char *method, struct blob_attr *msg)
-{
- struct ubus_watch_object *w;
- struct blob_attr *tb[__WATCH_MAX];
-
- blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg));
-
- if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY])
- return UBUS_STATUS_INVALID_ARGUMENT;
-
- if (req->peer)
- return UBUS_STATUS_INVALID_ARGUMENT;
-
- w = container_of(obj, struct ubus_watch_object, obj);
- w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID]));
- return 0;
-}
-
-static const struct ubus_method watch_method = {
- .name = NULL,
- .handler = ubus_watch_cb,
-};
-
-int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj)
-{
- struct ubus_object *obj = &w_obj->obj;
-
- obj->methods = &watch_method;
- obj->n_methods = 1;
-
- return ubus_add_object(ctx, obj);
-}
-
-static int
-__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type)
-{
- struct ubus_request req;
-
- blob_buf_init(&b, 0);
- blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
- blob_put_int32(&b, UBUS_ATTR_TARGET, id);
- if (method)
- blob_put_string(&b, UBUS_ATTR_METHOD, method);
-
- if (ubus_start_request(ctx, &req, b.head, type, 0) < 0)
- return UBUS_STATUS_INVALID_ARGUMENT;
-
- return ubus_complete_request(ctx, &req, 0);
-
-}
-
-int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id)
-{
- return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH);
-}
-
-int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id)
-{
- return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH);
-}
-
int ubus_send_event(struct ubus_context *ctx, const char *id,
struct blob_attr *data)
{
struct ubus_request_data;
struct ubus_object_data;
struct ubus_event_handler;
-struct ubus_watch_object;
+struct ubus_subscriber;
typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx,
struct ubus_object_data *obj,
typedef int (*ubus_handler_t)(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req,
const char *method, struct blob_attr *msg);
+typedef void (*ubus_remove_handler_t)(struct ubus_context *ctx,
+ struct ubus_subscriber *obj, uint32_t id);
typedef void (*ubus_event_handler_t)(struct ubus_context *ctx, struct ubus_event_handler *ev,
const char *type, struct blob_attr *msg);
-typedef void (*ubus_watch_handler_t)(struct ubus_context *ctx, struct ubus_watch_object *w,
- uint32_t id);
typedef void (*ubus_data_handler_t)(struct ubus_request *req,
int type, struct blob_attr *msg);
typedef void (*ubus_complete_handler_t)(struct ubus_request *req, int ret);
int n_methods;
};
-struct ubus_watch_object {
+struct ubus_subscriber {
struct ubus_object obj;
- ubus_watch_handler_t cb;
+ ubus_handler_t cb;
+ ubus_remove_handler_t remove_cb;
};
struct ubus_event_handler {
/* remove the object from the ubus connection */
int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj);
-/* add an object for watching other object state changes */
-int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *obj);
-
-int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id);
-
-int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id);
+/* add a subscriber notifications from another object */
+int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *obj);
+int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id);
+int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id);
/* ----------- rpc ----------- */
obj->type = type;
INIT_LIST_HEAD(&obj->list);
INIT_LIST_HEAD(&obj->events);
- INIT_LIST_HEAD(&obj->watchers);
- INIT_LIST_HEAD(&obj->watched);
+ INIT_LIST_HEAD(&obj->subscribers);
+ INIT_LIST_HEAD(&obj->target_list);
if (type)
type->refcount++;
return NULL;
}
-void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method)
+void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method)
{
- struct ubus_watch *w;
+ struct ubus_subscription *s;
- w = calloc(1, sizeof(*w) + strlen(method) + 1);
- if (!w)
+ s = calloc(1, sizeof(*s) + strlen(method) + 1);
+ if (!s)
return;
- w->watcher = obj;
- w->watched = target;
- list_add(&w->watcher_list, &target->watchers);
- list_add(&w->watched_list, &obj->watched);
- strcpy(w->method, method);
+ s->subscriber = obj;
+ s->target = target;
+ list_add(&s->list, &target->subscribers);
+ list_add(&s->target_list, &obj->target_list);
+ strcpy(s->method, method);
}
-void ubus_watch_free(struct ubus_watch *w)
+void ubus_unsubscribe(struct ubus_subscription *s)
{
- list_del(&w->watcher_list);
- list_del(&w->watched_list);
- free(w);
+ list_del(&s->list);
+ list_del(&s->target_list);
+ free(s);
}
void ubusd_free_object(struct ubus_object *obj)
{
- struct ubus_watch *w, *tmp;
+ struct ubus_subscription *s, *tmp;
- list_for_each_entry_safe(w, tmp, &obj->watched, watched_list) {
- ubus_watch_free(w);
+ list_for_each_entry_safe(s, tmp, &obj->target_list, target_list) {
+ ubus_unsubscribe(s);
}
- list_for_each_entry_safe(w, tmp, &obj->watchers, watcher_list) {
- ubus_proto_notify_watch(w);
+ list_for_each_entry_safe(s, tmp, &obj->subscribers, list) {
+ ubus_notify_unsubscribe(s);
}
ubusd_event_cleanup_object(obj);
struct blob_attr data[];
};
-struct ubus_watch {
- struct list_head watcher_list, watched_list;
- struct ubus_object *watcher, *watched;
+struct ubus_subscription {
+ struct list_head list, target_list;
+ struct ubus_object *subscriber, *target;
char method[];
};
struct list_head events;
- struct list_head watchers, watched;
+ struct list_head subscribers, target_list;
struct ubus_object_type *type;
struct avl_node path;
return obj;
}
-void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method);
-void ubus_watch_free(struct ubus_watch *w);
-void ubus_proto_notify_watch(struct ubus_watch *w);
+void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method);
+void ubus_unsubscribe(struct ubus_subscription *s);
+void ubus_notify_unsubscribe(struct ubus_subscription *s);
#endif
if (cl == target->client)
return UBUS_STATUS_INVALID_ARGUMENT;
- ubus_watch_new(obj, target, blob_data(attr[UBUS_ATTR_METHOD]));
+ ubus_subscribe(obj, target, blob_data(attr[UBUS_ATTR_METHOD]));
return 0;
}
static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
{
struct ubus_object *obj;
- struct ubus_watch *w;
+ struct ubus_subscription *s;
uint32_t id;
if (!attr[UBUS_ATTR_OBJID] || !attr[UBUS_ATTR_TARGET])
return UBUS_STATUS_INVALID_ARGUMENT;
id = blob_get_u32(attr[UBUS_ATTR_TARGET]);
- list_for_each_entry(w, &obj->watched, watched_list) {
- if (w->watched->id.id != id)
+ list_for_each_entry(s, &obj->target_list, target_list) {
+ if (s->target->id.id != id)
continue;
- ubus_watch_free(w);
+ ubus_unsubscribe(s);
return 0;
}
[UBUS_MSG_INVOKE] = ubusd_handle_invoke,
[UBUS_MSG_STATUS] = ubusd_handle_response,
[UBUS_MSG_DATA] = ubusd_handle_response,
- [UBUS_MSG_ADD_WATCH] = ubusd_handle_add_watch,
- [UBUS_MSG_REMOVE_WATCH] = ubusd_handle_remove_watch,
+ [UBUS_MSG_SUBSCRIBE] = ubusd_handle_add_watch,
+ [UBUS_MSG_UNSUBSCRIBE] = ubusd_handle_remove_watch,
};
void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub)
ubus_free_id(&clients, &cl->id);
}
-void ubus_proto_notify_watch(struct ubus_watch *w)
+void ubus_notify_unsubscribe(struct ubus_subscription *s)
{
struct ubus_msg_buf *ub;
- void *data;
blob_buf_init(&b, 0);
- blob_put_int32(&b, UBUS_ATTR_OBJID, w->watcher->id.id);
- blob_put_string(&b, UBUS_ATTR_METHOD, w->method);
-
- data = blob_nest_start(&b, UBUS_ATTR_DATA);
- blobmsg_add_string(&b, "notify", "remove");
- blobmsg_add_u32(&b, "id", w->watched->id.id);
- blobmsg_add_u32(&b, "peer", w->watched->client->id.id);
- blob_nest_end(&b, data);
+ blob_put_int32(&b, UBUS_ATTR_OBJID, s->subscriber->id.id);
+ blob_put_int32(&b, UBUS_ATTR_TARGET, s->target->id.id);
ub = ubus_msg_from_blob(false);
- ubus_msg_init(ub, UBUS_MSG_INVOKE, ++w->watcher->invoke_seq, 0);
- ubus_msg_send(w->watcher->client, ub, true);
+ ubus_msg_init(ub, UBUS_MSG_UNSUBSCRIBE, ++s->subscriber->invoke_seq, 0);
+ ubus_msg_send(s->subscriber->client, ub, true);
- ubus_watch_free(w);
+ ubus_unsubscribe(s);
}
static void __init ubusd_proto_init(void)
UBUS_MSG_ADD_OBJECT,
UBUS_MSG_REMOVE_OBJECT,
- /* watch an object, notify on remove */
- UBUS_MSG_ADD_WATCH,
- UBUS_MSG_REMOVE_WATCH,
+ /*
+ * subscribe/unsubscribe to object notifications
+ * The unsubscribe message is sent from ubusd when
+ * the object disappears
+ */
+ UBUS_MSG_SUBSCRIBE,
+ UBUS_MSG_UNSUBSCRIBE,
/* must be last */
__UBUS_MSG_LAST,