From 976719d89ca58cb6d393ded3a8451b7ba6567361 Mon Sep 17 00:00:00 2001 From: Iain Fraser Date: Thu, 19 May 2016 14:40:42 +0100 Subject: [PATCH] Implemented publish/subscribe lua bindings to libubus-lua with example lua files. --- lua/publisher.lua | 60 ++++++++++++ lua/subscriber.lua | 25 +++++ lua/ubus.c | 230 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 314 insertions(+), 1 deletion(-) create mode 100755 lua/publisher.lua create mode 100755 lua/subscriber.lua diff --git a/lua/publisher.lua b/lua/publisher.lua new file mode 100755 index 0000000..8ee3b83 --- /dev/null +++ b/lua/publisher.lua @@ -0,0 +1,60 @@ +#!/usr/bin/env lua + +require "ubus" +require "uloop" + +--[[ + A demo of ubus publisher binding. Should be run before subscriber.lua +--]] + + +uloop.init() + +local conn = ubus.connect() +if not conn then + error("Failed to connect to ubus") +end + +local ubus_objects = { + test = { + hello = { + function(req, msg) + conn:reply(req, {message="foo"}); + print("Call to function 'hello'") + for k, v in pairs(msg) do + print("key=" .. k .. " value=" .. tostring(v)) + end + end, {id = ubus.INT32, msg = ubus.STRING } + }, + hello1 = { + function(req) + conn:reply(req, {message="foo1"}); + conn:reply(req, {message="foo2"}); + print("Call to function 'hello1'") + end, {id = ubus.INT32, msg = ubus.STRING } + }, + __subscriber_cb = function( subs ) + print("total subs: ", subs ) + end + } +} + +conn:add( ubus_objects ) +print("Objects added, starting loop") + +-- start time +local timer +local counter = 0 +function t() + counter = counter + 1 + local params = { + count = counter + } + conn:notify( ubus_objects.test.__ubusobj, "test.alarm", params ) + timer:set(10000) +end +timer = uloop.timer(t) +timer:set(1000) + + +uloop.run() diff --git a/lua/subscriber.lua b/lua/subscriber.lua new file mode 100755 index 0000000..e1d3a9f --- /dev/null +++ b/lua/subscriber.lua @@ -0,0 +1,25 @@ +#!/usr/bin/env lua + +--[[ + A demo of ubus subscriber binding. Should be run after publisher.lua +--]] + +require "ubus" +require "uloop" + +uloop.init() + +local conn = ubus.connect() +if not conn then + error("Failed to connect to ubus") +end + +local sub = { + notify = function( msg ) + print("Count: ", msg["count"]) + end, +} + +conn:subscribe( "test", sub ) + +uloop.run() diff --git a/lua/ubus.c b/lua/ubus.c index 86e34b7..f59af90 100644 --- a/lua/ubus.c +++ b/lua/ubus.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2012 Jo-Philipp Wich * Copyright (C) 2012 John Crispin + * Copyright (C) 2016 Iain Fraser * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License version 2.1 @@ -33,6 +34,7 @@ struct ubus_lua_connection { struct ubus_lua_object { struct ubus_object o; int r; + int rsubscriber; }; struct ubus_lua_event { @@ -40,6 +42,12 @@ struct ubus_lua_event { int r; }; +struct ubus_lua_subscriber { + struct ubus_subscriber s; + int rnotify; + int rremove; +}; + static int ubus_lua_parse_blob(lua_State *L, struct blob_attr *attr, bool table); @@ -412,6 +420,39 @@ static int ubus_lua_load_methods(lua_State *L, struct ubus_method *m) return 0; } +static void +ubus_new_sub_cb(struct ubus_context *ctx, struct ubus_object *obj) +{ + struct ubus_lua_object *luobj; + + luobj = container_of(obj, struct ubus_lua_object, o); + + lua_getglobal(state, "__ubus_cb_publisher"); + lua_rawgeti(state, -1, luobj->rsubscriber); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + lua_pushnumber(state, luobj->o.has_subscribers ); + lua_call(state, 1, 0); + } else { + lua_pop(state, 1); + } +} + +static void +ubus_lua_load_newsub_cb( lua_State *L, struct ubus_lua_object *obj ) +{ + /* keep ref to func */ + lua_getglobal(L, "__ubus_cb_publisher"); + lua_pushvalue(L, -2); + obj->rsubscriber = luaL_ref(L, -2); + lua_pop(L, 1); + + /* real callback */ + obj->o.subscribe_cb = ubus_new_sub_cb; + return; +} + static struct ubus_object* ubus_lua_load_object(lua_State *L) { struct ubus_lua_object *obj = NULL; @@ -454,6 +495,13 @@ static struct ubus_object* ubus_lua_load_object(lua_State *L) /* scan each method */ lua_pushnil(L); while (lua_next(L, -3) != 0) { + /* check if its the subscriber notification callback */ + if( lua_type( L, -2 ) == LUA_TSTRING && + lua_type( L, -1 ) == LUA_TFUNCTION ){ + if( !strcmp( lua_tostring( L, -2 ), "__subscriber_cb" ) ) + ubus_lua_load_newsub_cb( L, obj ); + } + /* check if it looks like a method */ if ((lua_type(L, -2) != LUA_TSTRING) || (lua_type(L, -1) != LUA_TTABLE) || @@ -495,8 +543,14 @@ static int ubus_lua_add(lua_State *L) if ((lua_type(L, -2) == LUA_TSTRING) && (lua_type(L, -1) == LUA_TTABLE)) { obj = ubus_lua_load_object(L); - if (obj) + if (obj){ ubus_add_object(c->ctx, obj); + + /* allow future reference of ubus obj */ + lua_pushstring(state,"__ubusobj"); + lua_pushlightuserdata(state, obj); + lua_settable(state,-3); + } } lua_pop(L, 1); } @@ -504,6 +558,34 @@ static int ubus_lua_add(lua_State *L) return 0; } +static int +ubus_lua_notify( lua_State *L ) +{ + struct ubus_lua_connection *c; + struct ubus_object *obj; + const char* method; + + c = luaL_checkudata(L, 1, METANAME); + method = luaL_checkstring(L, 3); + luaL_checktype(L, 4, LUA_TTABLE); + + if( !lua_islightuserdata( L, 2 ) ){ + lua_pushfstring( L, "Invald 2nd parameter, expected ubus obj ref" ); + lua_error( L ); + } + obj = lua_touserdata( L, 2 ); + + /* create parameters from table */ + blob_buf_init(&c->buf, 0); + if( !ubus_lua_format_blob_array( L, &c->buf, true ) ){ + lua_pushfstring( L, "Invalid 4th parameter, expected table of arguments" ); + lua_error( L ); + } + + ubus_notify( c->ctx, obj, method, c->buf.head, -1 ); + return 0; +} + static void ubus_lua_signatures_cb(struct ubus_context *c, struct ubus_object_data *o, void *p) { @@ -653,6 +735,143 @@ ubus_lua_listen(lua_State *L) { return 0; } +static void +ubus_sub_remove_handler(struct ubus_context *ctx, struct ubus_subscriber *s, + uint32_t id) +{ + struct ubus_lua_subscriber *sub; + + sub = container_of(s, struct ubus_lua_subscriber, s); + + lua_getglobal(state, "__ubus_cb_subscribe"); + lua_rawgeti(state, -1, sub->rremove); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + lua_call(state, 0, 0); + } else { + lua_pop(state, 1); + } +} + +static int +ubus_sub_notify_handler(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, const char *method, + struct blob_attr *msg) +{ + struct ubus_subscriber *s; + struct ubus_lua_subscriber *sub; + + s = container_of(obj, struct ubus_subscriber, obj); + sub = container_of(s, struct ubus_lua_subscriber, s); + + lua_getglobal(state, "__ubus_cb_subscribe"); + lua_rawgeti(state, -1, sub->rnotify); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + if( msg ){ + ubus_lua_parse_blob_array(state, blob_data(msg), blob_len(msg), true); + lua_call(state, 1, 0); + } else { + lua_call(state, 0, 0); + } + } else { + lua_pop(state, 1); + } + + return 0; +} + + + +static void +ubus_lua_do_subscribe( struct ubus_context *ctx, lua_State *L, const char* target, + int idxnotify, int idxremove ) +{ + uint32_t id; + int status; + struct ubus_lua_subscriber *sub; + + if( ( status = ubus_lookup_id( ctx, target, &id ) ) ){ + lua_pushfstring( L, "Unable find target, status=%d", status ); + lua_error( L ); + } + + sub = malloc( sizeof( struct ubus_lua_subscriber ) ); + memset( sub, 0, sizeof( struct ubus_lua_subscriber ) ); + if( !sub ){ + lua_pushstring( L, "Out of memory" ); + lua_error( L ); + } + + if( idxnotify ){ + lua_getglobal(L, "__ubus_cb_subscribe"); + lua_pushvalue(L, idxnotify); + sub->rnotify = luaL_ref(L, -2); + lua_pop(L, 1); + sub->s.cb = ubus_sub_notify_handler; + } + + if( idxremove ){ + lua_getglobal(L, "__ubus_cb_subscribe"); + lua_pushvalue(L, idxnotify); + sub->rnotify = luaL_ref(L, -2); + lua_pop(L, 1); + sub->s.remove_cb = ubus_sub_remove_handler; + } + + if( ( status = ubus_register_subscriber( ctx, &sub->s ) ) ){ + lua_pushfstring( L, "Failed to register subscriber, status=%d", status ); + lua_error( L ); + } + + if( ( status = ubus_subscribe( ctx, &sub->s, id) ) ){ + lua_pushfstring( L, "Failed to register subscriber, status=%d", status ); + lua_error( L ); + } +} + +static int +ubus_lua_subscribe(lua_State *L) { + int idxnotify, idxremove, stackstart; + struct ubus_lua_connection *c; + const char* target; + + idxnotify = idxremove = 0; + stackstart = lua_gettop( L ); + + + c = luaL_checkudata(L, 1, METANAME); + target = luaL_checkstring(L, 2); + luaL_checktype(L, 3, LUA_TTABLE); + + + lua_pushstring( L, "notify"); + lua_gettable( L, 3 ); + if( lua_type( L, -1 ) == LUA_TFUNCTION ){ + idxnotify = lua_gettop( L ); + } else { + lua_pop( L, 1 ); + } + + lua_pushstring( L, "remove"); + lua_gettable( L, 3 ); + if( lua_type( L, -1 ) == LUA_TFUNCTION ){ + idxremove = lua_gettop( L ); + } else { + lua_pop( L, 1 ); + } + + if( idxnotify ) + ubus_lua_do_subscribe( c->ctx, L, target, idxnotify, idxremove ); + + if( lua_gettop( L ) > stackstart ) + lua_pop( L, lua_gettop( L ) - stackstart ); + + return 0; +} + static int ubus_lua_send(lua_State *L) { @@ -699,12 +918,14 @@ static const luaL_Reg ubus[] = { { "connect", ubus_lua_connect }, { "objects", ubus_lua_objects }, { "add", ubus_lua_add }, + { "notify", ubus_lua_notify }, { "reply", ubus_lua_reply }, { "signatures", ubus_lua_signatures }, { "call", ubus_lua_call }, { "close", ubus_lua__gc }, { "listen", ubus_lua_listen }, { "send", ubus_lua_send }, + { "subscribe", ubus_lua_subscribe }, { "__gc", ubus_lua__gc }, { NULL, NULL }, }; @@ -758,5 +979,12 @@ luaopen_ubus(lua_State *L) lua_createtable(L, 1, 0); lua_setglobal(L, "__ubus_cb_event"); + /* create the subscriber table */ + lua_createtable(L, 1, 0); + lua_setglobal(L, "__ubus_cb_subscribe"); + + /* create the publisher table - notifications of new subs */ + lua_createtable(L, 1, 0); + lua_setglobal(L, "__ubus_cb_publisher"); return 0; } -- 2.25.1