/**
* Message to request monitoring messages, clients -> DHT service.
*/
-struct GNUNET_DHT_MonitorStartMessage
+struct GNUNET_DHT_MonitorStartStopMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_START
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_(START|STOP)
*/
struct GNUNET_MessageHeader header;
void *cb_cls)
{
struct GNUNET_DHT_MonitorHandle *h;
- struct GNUNET_DHT_MonitorStartMessage *m;
+ struct GNUNET_DHT_MonitorStartStopMessage *m;
struct PendingMessage *pending;
h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
memcpy (h->key, key, sizeof(GNUNET_HashCode));
}
- pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) +
+ pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
sizeof (struct PendingMessage));
- m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1];
+ m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
pending->msg = &m->header;
pending->handle = handle;
pending->free_on_send = GNUNET_YES;
m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
- m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage));
+ m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
m->type = htonl(type);
- m->get = (NULL != get_cb);
- m->get_resp = (NULL != get_resp_cb);
- m->put = (NULL != put_cb);
+ m->get = htons(NULL != get_cb);
+ m->get_resp = htons(NULL != get_resp_cb);
+ m->put = htons(NULL != put_cb);
if (NULL != key) {
- m->filter_key = 1;
+ m->filter_key = htons(1);
memcpy (&m->key, key, sizeof(GNUNET_HashCode));
}
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
void
GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
{
- GNUNET_free_non_null (handle->key);
+ struct GNUNET_DHT_MonitorStartStopMessage *m;
+ struct PendingMessage *pending;
+
GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
handle->dht_handle->monitor_tail,
handle);
- /* FIXME notify service of stop */
+
+ pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
+ sizeof (struct PendingMessage));
+ m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
+ pending->msg = &m->header;
+ pending->handle = handle->dht_handle;
+ pending->free_on_send = GNUNET_YES;
+ m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
+ m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
+ m->type = htonl(handle->type);
+ m->get = htons(NULL != handle->get_cb);
+ m->get_resp = htons(NULL != handle->get_resp_cb);
+ m->put = htons(NULL != handle->put_cb);
+ if (NULL != handle->key) {
+ m->filter_key = htons(1);
+ memcpy (&m->key, handle->key, sizeof(GNUNET_HashCode));
+ }
+ GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
+ handle->dht_handle->pending_tail,
+ pending);
+ pending->in_pending_queue = GNUNET_YES;
+ process_pending_messages (handle->dht_handle);
+
+ GNUNET_free_non_null (handle->key);
GNUNET_free (handle);
}
/**
- * Handler for monitor messages
+ * Handler for monitor start messages
*
* @param cls closure for the service
* @param client the client we received this message from
const struct GNUNET_MessageHeader *message)
{
struct ClientMonitorRecord *r;
- const struct GNUNET_DHT_MonitorStartMessage *msg;
- unsigned int i;
- char *c;
+ const struct GNUNET_DHT_MonitorStartStopMessage *msg;
- msg = (struct GNUNET_DHT_MonitorStartMessage *) message;
+ msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
r = GNUNET_malloc (sizeof(struct ClientMonitorRecord));
r->client = find_active_client(client);
r->type = ntohl(msg->type);
- r->get = msg->get;
- r->get_resp = msg->get_resp;
- r->put = msg->put;
- c = (char *) &msg->key;
- for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++);
- if (sizeof (GNUNET_HashCode) == i)
- r->key = NULL;
+ r->get = ntohs(msg->get);
+ r->get_resp = ntohs(msg->get_resp);
+ r->put = ntohs(msg->put);
+ if (0 == ntohs(msg->filter_key))
+ r->key = NULL;
else
{
r->key = GNUNET_malloc (sizeof (GNUNET_HashCode));
memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode));
}
GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r);
- // FIXME add remove somewhere
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+/**
+ * Handler for monitor stop messages
+ *
+ * @param cls closure for the service
+ * @param client the client we received this message from
+ * @param message the actual message received
+ *
+ */
+static void
+handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct ClientMonitorRecord *r;
+ const struct GNUNET_DHT_MonitorStartStopMessage *msg;
+ int keys_match;
+
+ msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
+ r = monitor_head;
+
+ while (NULL != r)
+ {
+ if (NULL == r->key)
+ keys_match = (0 == ntohs(msg->filter_key));
+ else
+ {
+ keys_match = (0 != ntohs(msg->filter_key)
+ && !memcmp(r->key, &msg->key, sizeof(GNUNET_HashCode)));
+ }
+ if (find_active_client(client) == r->client
+ && ntohl(msg->type) == r->type
+ && r->get == msg->get
+ && r->get_resp == msg->get_resp
+ && r->put == msg->put
+ && keys_match
+ )
+ {
+ GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r);
+ GNUNET_free_non_null (r->key);
+ GNUNET_free (r);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ return; /* Delete only ONE entry */
+ }
+ r = r->next;
+ }
+
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
{&handle_dht_local_monitor, NULL,
GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
- sizeof (struct GNUNET_DHT_MonitorStartMessage)},
+ sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
+ {&handle_dht_local_monitor_stop, NULL,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
+ sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
{NULL, NULL, 0, 0}
};
forward_map = GNUNET_CONTAINER_multihashmap_create (1024);