#define CORE_QUEUE_SIZE 10
+#define LOCAL_QUEUE_SIZE 100
#define REFRESH_PATH_TIME GNUNET_TIME_relative_multiply(\
GNUNET_TIME_UNIT_SECONDS,\
300)
/**
* All the clients
*/
-static struct MeshClient *clients;
-static struct MeshClient *clients_tail;
+static struct MeshClient *clients;
+static struct MeshClient *clients_tail;
/**
* Tunnels known, indexed by MESH_TunnelID (MeshTunnel)
/**
* Handle to communicate with core
*/
-static struct GNUNET_CORE_Handle *core_handle;
+static struct GNUNET_CORE_Handle *core_handle;
/**
* Handle to use DHT
*/
-static struct GNUNET_DHT_Handle *dht_handle;
+static struct GNUNET_DHT_Handle *dht_handle;
/**
* Handle to server
*/
-static struct GNUNET_SERVER_Handle *server_handle;
+static struct GNUNET_SERVER_Handle *server_handle;
+
+/**
+ * Notification context, to send messages to local clients
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
/**
* Local peer own ID (memory efficient handle)
*/
-static GNUNET_PEER_Id myid;
+static GNUNET_PEER_Id myid;
/**
* Tunnel ID for the next created tunnel (global tunnel number)
*/
-static MESH_TunnelNumber next_tid;
+static MESH_TunnelNumber next_tid;
/******************************************************************************/
/****************** GENERAL HELPER FUNCTIONS ************************/
/**
- * Function called to notify a client about the socket
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the socket was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * Send the message to all clients that have subscribed to its type
+ *
+ * @param msg Pointer to the message itself
+ * @return number of clients this message was sent to
*/
-size_t
-send_client_raw (void *cls, size_t size, void *buf)
+static unsigned int
+send_subscribed_clients (struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_MessageHeader *msg = cls;
- size_t msg_size;
-
- msg_size = ntohs(msg->size);
- if (msg_size > size) {
- GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
- "deliver to client failed: buffer too small\n");
- return 0;
+ struct MeshClient *c;
+ unsigned int count;
+ uint16_t type;
+
+ type = ntohs(msg->type);
+ for (count = 0, c = clients; c != NULL; c = c->next) {
+ if (is_client_subscribed(type, c)) {
+ count++;
+ GNUNET_SERVER_notification_context_unicast(nc,
+ c->handle,
+ msg,
+ GNUNET_YES);
+ }
}
- memcpy(buf, cls, msg_size);
- return msg_size;
+ return count;
}
uint16_t payload_type;
size = ntohs(message->size);
- if (size < sizeof(struct GNUNET_MESH_DataMessageFromOrigin)) {
+ if (size < sizeof(struct GNUNET_MESH_DataMessageFromOrigin)
+ + sizeof(struct GNUNET_MessageHeader))
+ {
GNUNET_break(0);
return GNUNET_OK;
}
return GNUNET_OK;
}
if (pi->id == myid) {
- payload_type = ntohs(msg[1].header.type);
- for (c = clients; NULL != c; c = c->next) {
- if (is_client_subscribed(payload_type, c)) {
- /* FIXME copy data to buffer (info), msg will expire */
- GNUNET_SERVER_notify_transmit_ready(c->handle,
- size - sizeof(struct GNUNET_MESH_DataMessageFromOrigin),
- GNUNET_TIME_UNIT_FOREVER_REL,
- send_client_raw,
- &msg[1]);
- }
- }
+ send_subscribed_clients((struct GNUNET_MessageHeader *)&msg[1]);
return GNUNET_OK;
}
GNUNET_PEER_resolve(get_first_hop(pi->path), &id);
struct MeshTunnel *t;
struct MeshClient *c;
struct MeshDataDescriptor *dd;
- struct GNUNET_SERVER_NotificationContext *nc;
GNUNET_PEER_Id *neighbors;
size_t size;
uint16_t type;
size = ntohs(message->size);
- if (size < sizeof(struct GNUNET_MESH_DataMessageMulticast)) {
+ if (size < sizeof(struct GNUNET_MESH_DataMessageMulticast)
+ + sizeof(struct GNUNET_MessageHeader))
+ {
GNUNET_break_op (0);
return GNUNET_OK;
}
t = retrieve_tunnel(&msg->oid, ntohl(msg->tid));
if (NULL == t) {
+ /* TODO notify that we dont know that tunnel */
return GNUNET_OK;
}
/* Transmit to locally interested clients */
GNUNET_PEER_resolve(myid, &id);
if (GNUNET_CONTAINER_multihashmap_contains(t->peers, &id.hashPubKey)) {
- type = ntohs(msg[1].header.type);
- nc = GNUNET_SERVER_notification_context_create(server_handle,
- CORE_QUEUE_SIZE);
- for (c = clients; c != NULL; c = c->next) {
- if (is_client_subscribed(type, c)) {
- GNUNET_SERVER_notification_context_add(nc, c->handle);
- }
- }
- GNUNET_SERVER_notification_context_broadcast(nc, message, GNUNET_NO);
- GNUNET_SERVER_notification_context_destroy(nc);
- /* FIXME is this right? better to do like in core retransmissions? */
+ send_subscribed_clients((struct GNUNET_MessageHeader *)&msg[1]);
}
/* Retransmit to other peers */
size_t size;
size = ntohs(message->size);
- if (size < sizeof(struct GNUNET_MESH_DataMessageToOrigin)) {
+ if (size < sizeof(struct GNUNET_MESH_DataMessageToOrigin)
+ + sizeof(struct GNUNET_MessageHeader))
+ {
GNUNET_break_op (0);
return GNUNET_OK; // FIXME maybe SYSERR? peer misbehaving?
}
c = c->next;
}
}
+
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
" done!\n");
return;
GNUNET_CONTAINER_DLL_insert(clients, clients_tail, c);
c->tunnels = GNUNET_CONTAINER_multihashmap_create(32);
+ GNUNET_SERVER_notification_context_add(nc, client);
GNUNET_SERVER_receive_done(client, GNUNET_OK);
GNUNET_DHT_disconnect (dht_handle);
dht_handle = NULL;
}
+ if (nc != NULL) {
+ GNUNET_SERVER_notification_context_destroy(nc);
+ nc = NULL;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"shut down\n");
}
tunnels = GNUNET_CONTAINER_multihashmap_create(32);
peers = GNUNET_CONTAINER_multihashmap_create(32);
+ nc = GNUNET_SERVER_notification_context_create(server_handle,
+ LOCAL_QUEUE_SIZE);
clients = NULL;
clients_tail = NULL;