-modify timeout values further
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
index f26d777929c1f0050d68a08786b83439d3a92110..173a1c3efdefd2ac79cd06b35fd6d40e9bff1d03 100644 (file)
@@ -111,7 +111,7 @@ struct ClientQueryRecord
   /**
    * The key this request was about
    */
-  GNUNET_HashCode key;
+  struct GNUNET_HashCode key;
 
   /**
    * Client responsible for the request.
@@ -126,7 +126,7 @@ struct ClientQueryRecord
   /**
    * Replies we have already seen for this request.
    */
-  GNUNET_HashCode *seen_replies;
+  struct GNUNET_HashCode *seen_replies;
 
   /**
    * Pointer to this nodes heap location in the retry-heap (for fast removal)
@@ -201,7 +201,7 @@ struct ClientMonitorRecord
   /**
    * Key of data of interest, NULL for all.
    */
-  GNUNET_HashCode         *key;
+  struct GNUNET_HashCode         *key;
 
   /**
    * Flag whether to notify about GET messages.
@@ -261,6 +261,31 @@ static struct GNUNET_CONTAINER_Heap *retry_heap;
 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
 
 
+/**
+ * Task run to check for messages that need to be sent to a client.
+ *
+ * @param client a ClientList, containing the client and any messages to be sent to it
+ */
+static void
+process_pending_messages (struct ClientList *client);
+
+
+/**
+ * Add a PendingMessage to the clients list of messages to be sent
+ *
+ * @param client the active client to send the message to
+ * @param pending_message the actual message to send
+ */
+static void
+add_pending_message (struct ClientList *client,
+                     struct PendingMessage *pending_message)
+{
+  GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
+                                    pending_message);
+  process_pending_messages (client);
+}
+
+
 /**
  * Find a client if it exists, add it otherwise.
  *
@@ -297,18 +322,16 @@ find_active_client (struct GNUNET_SERVER_Client *client)
  * @return GNUNET_YES (we should continue to iterate)
  */
 static int
-remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
+remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value)
 {
   struct ClientList *client = cls;
   struct ClientQueryRecord *record = value;
 
   if (record->client != client)
     return GNUNET_YES;
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Removing client %p's record for key %s\n", client,
               GNUNET_h2s (key));
-#endif
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
                                                        record));
@@ -335,9 +358,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
   struct PendingMessage *reply;
   struct ClientMonitorRecord *monitor;
 
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client);
-#endif
   pos = find_active_client (client);
   GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
   if (pos->transmit_handle != NULL)
@@ -464,6 +485,8 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
   uint16_t size;
+  struct PendingMessage *pm;
+  struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
 
   size = ntohs (message->size);
   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
@@ -478,12 +501,10 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
                             GNUNET_NO);
   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
   /* give to local clients */
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Handling local PUT of %u-bytes for query %s\n",
               size - sizeof (struct GNUNET_DHT_ClientPutMessage),
               GNUNET_h2s (&dht_msg->key));
-#endif
   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
                             &dht_msg->key, 0, NULL, 0, NULL,
                             ntohl (dht_msg->type),
@@ -516,6 +537,15 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
                            &dht_msg[1],
                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
+                     sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
+  conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
+  conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
+  conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
+  conf->reserved = htonl (0);
+  conf->unique_id = dht_msg->unique_id;
+  pm->msg = &conf->header;
+  add_pending_message (find_active_client (client), pm);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -553,11 +583,9 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
                             gettext_noop
                             ("# GET requests received from clients"), 1,
                             GNUNET_NO);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received request for %s from local client %p\n",
               GNUNET_h2s (&get->key), client);
-#endif
   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
   cqr->key = get->key;
   cqr->client = find_active_client (client);
@@ -618,18 +646,16 @@ struct RemoveByUniqueIdContext
  * @return GNUNET_YES (we should continue to iterate)
  */
 static int
-remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
+remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value)
 {
   const struct RemoveByUniqueIdContext *ctx = cls;
   struct ClientQueryRecord *record = value;
 
   if (record->unique_id != ctx->unique_id)
     return GNUNET_YES;
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Removing client %p's record for key %s (by unique id)\n",
               ctx->client->client_handle, GNUNET_h2s (key));
-#endif
   return remove_client_records (ctx->client, key, record);
 }
 
@@ -655,10 +681,8 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
                             gettext_noop
                             ("# GET STOP requests received from clients"), 1,
                             GNUNET_NO);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key %s\n",
               client, GNUNET_h2s (&dht_stop_msg->key));
-#endif
   ctx.client = find_active_client (client);
   ctx.unique_id = dht_stop_msg->unique_id;
   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key,
@@ -668,7 +692,7 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
 
 
 /**
- * Handler for monitor messages
+ * Handler for monitor start messages
  *
  * @param cls closure for the service
  * @param client the client we received this message from
@@ -680,40 +704,74 @@ handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
                           const struct GNUNET_MessageHeader *message)
 {
   struct ClientMonitorRecord *r;
-  const struct GNUNET_DHT_MonitorStartMessage *msg;
-  unsigned int i;
-  char *c;
+  const struct GNUNET_DHT_MonitorStartStopMessage *msg;
 
-  msg = (struct GNUNET_DHT_MonitorStartMessage *) message;
+  msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
   r = GNUNET_malloc (sizeof(struct ClientMonitorRecord));
 
   r->client = find_active_client(client);
   r->type = ntohl(msg->type);
-  r->get = msg->get;
-  r->get_resp = msg->get_resp;
-  r->put = msg->put;
-  c = (char *) &msg->key;
-  for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++);
-  if (sizeof (GNUNET_HashCode) == i)
-    r->key = NULL;
+  r->get = ntohs(msg->get);
+  r->get_resp = ntohs(msg->get_resp);
+  r->put = ntohs(msg->put);
+  if (0 == ntohs(msg->filter_key))
+      r->key = NULL;
   else
   {
-    r->key = GNUNET_malloc (sizeof (GNUNET_HashCode));
-    memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode));
+    r->key = GNUNET_malloc (sizeof (struct GNUNET_HashCode));
+    memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode));
   }
   GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r);
-  // FIXME add remove somewhere
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
-
 /**
- * Task run to check for messages that need to be sent to a client.
+ * Handler for monitor stop messages
+ *
+ * @param cls closure for the service
+ * @param client the client we received this message from
+ * @param message the actual message received
  *
- * @param client a ClientList, containing the client and any messages to be sent to it
  */
 static void
-process_pending_messages (struct ClientList *client);
+handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client,
+                               const struct GNUNET_MessageHeader *message)
+{
+  struct ClientMonitorRecord *r;
+  const struct GNUNET_DHT_MonitorStartStopMessage *msg;
+  int keys_match;
+
+  msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
+  r = monitor_head;
+
+  while (NULL != r)
+  {
+    if (NULL == r->key)
+        keys_match = (0 == ntohs(msg->filter_key));
+    else
+    {
+        keys_match = (0 != ntohs(msg->filter_key)
+                      && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode)));
+    }
+    if (find_active_client(client) == r->client
+        && ntohl(msg->type) == r->type
+        && r->get == msg->get
+        && r->get_resp == msg->get_resp
+        && r->put == msg->put
+        && keys_match
+        )
+    {
+        GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r);
+        GNUNET_free_non_null (r->key);
+        GNUNET_free (r);
+        GNUNET_SERVER_receive_done (client, GNUNET_OK);
+        return; /* Delete only ONE entry */
+    }
+    r = r->next;
+  }
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
 
 
 /**
@@ -741,11 +799,9 @@ send_reply_to_client (void *cls, size_t size, void *buf)
   if (buf == NULL)
   {
     /* client disconnected */
-#if DEBUG_DHT
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Client %p disconnected, pending messages will be discarded\n",
                 client->client_handle);
-#endif
     return 0;
   }
   off = 0;
@@ -756,17 +812,13 @@ send_reply_to_client (void *cls, size_t size, void *buf)
                                  reply);
     memcpy (&cbuf[off], reply->msg, msize);
     GNUNET_free (reply);
-#if DEBUG_DHT
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n",
                 msize, client->client_handle);
-#endif
     off += msize;
   }
   process_pending_messages (client);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n",
               (unsigned int) off, (unsigned int) size, client->client_handle);
-#endif
   return off;
 }
 
@@ -781,20 +833,16 @@ process_pending_messages (struct ClientList *client)
 {
   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
   {
-#if DEBUG_DHT
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Not asking for transmission to %p now: %s\n",
                 client->client_handle,
                 client->pending_head ==
                 NULL ? "no more messages" : "request already pending");
-#endif
     return;
   }
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Asking for transmission of %u bytes to client %p\n",
               ntohs (client->pending_head->msg->size), client->client_handle);
-#endif
   client->transmit_handle =
       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
                                            ntohs (client->pending_head->
@@ -804,22 +852,6 @@ process_pending_messages (struct ClientList *client)
 }
 
 
-/**
- * Add a PendingMessage to the clients list of messages to be sent
- *
- * @param client the active client to send the message to
- * @param pending_message the actual message to send
- */
-static void
-add_pending_message (struct ClientList *client,
-                     struct PendingMessage *pending_message)
-{
-  GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
-                                    pending_message);
-  process_pending_messages (client);
-}
-
-
 /**
  * Closure for 'forward_reply'
  */
@@ -866,7 +898,7 @@ struct ForwardReplyContext
  *         if the result is mal-formed, GNUNET_NO
  */
 static int
-forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
+forward_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
 {
   struct ForwardReplyContext *frc = cls;
   struct ClientQueryRecord *record = value;
@@ -874,16 +906,14 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
   struct GNUNET_DHT_ClientResultMessage *reply;
   enum GNUNET_BLOCK_EvaluationResult eval;
   int do_free;
-  GNUNET_HashCode ch;
+  struct GNUNET_HashCode ch;
   unsigned int i;
 
   if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type))
   {
-#if DEBUG_DHT
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Record type missmatch, not passing request for key %s to local client\n",
                 GNUNET_h2s (key));
-#endif
     GNUNET_STATISTICS_update (GDS_stats,
                               gettext_noop
                               ("# Key match, type mismatches in REPLY to CLIENT"),
@@ -892,13 +922,11 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
   }
   GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
   for (i = 0; i < record->seen_replies_count; i++)
-    if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode)))
+    if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode)))
     {
-#if DEBUG_DHT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Duplicate reply, not passing request for key %s to local client\n",
                   GNUNET_h2s (key));
-#endif
       GNUNET_STATISTICS_update (GDS_stats,
                                 gettext_noop
                                 ("# Duplicate REPLIES to CLIENT request dropped"),
@@ -909,11 +937,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
       GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0,
                              record->xquery, record->xquery_size, frc->data,
                              frc->data_size);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Evaluation result is %d for key %s for local client's query\n",
               (int) eval, GNUNET_h2s (key));
-#endif
   switch (eval)
   {
   case GNUNET_BLOCK_EVALUATION_OK_LAST:
@@ -964,11 +990,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
                             GNUNET_NO);
   reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
   reply->unique_id = record->unique_id;
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Queueing reply to query %s for client %p\n", GNUNET_h2s (key),
               record->client->client_handle);
-#endif
   add_pending_message (record->client, pm);
   if (GNUNET_YES == do_free)
     remove_client_records (record->client, key, record);
@@ -993,7 +1017,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
  */
 void
 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
-                          const GNUNET_HashCode * key,
+                          const struct GNUNET_HashCode * key,
                           unsigned int get_path_length,
                           const struct GNUNET_PeerIdentity *get_path,
                           unsigned int put_path_length,
@@ -1080,7 +1104,7 @@ GDS_CLIENTS_process_get (uint32_t options,
                          uint32_t desired_replication_level, 
                          unsigned int path_length,
                          const struct GNUNET_PeerIdentity *path,
-                         const GNUNET_HashCode * key)
+                         const struct GNUNET_HashCode * key)
 {
   struct ClientMonitorRecord *m;
   struct ClientList **cl;
@@ -1092,7 +1116,7 @@ GDS_CLIENTS_process_get (uint32_t options,
   {
     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
         (NULL == m->key ||
-         memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+         memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
     {
       struct PendingMessage *pm;
       struct GNUNET_DHT_MonitorGetMessage *mmsg;
@@ -1113,7 +1137,7 @@ GDS_CLIENTS_process_get (uint32_t options,
       msize += sizeof (struct PendingMessage);
       pm = (struct PendingMessage *) GNUNET_malloc (msize);
       mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
-      pm->msg = (struct GNUNET_MessageHeader *) mmsg;
+      pm->msg = &mmsg->header;
       mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
       mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
       mmsg->options = htonl(options);
@@ -1121,7 +1145,7 @@ GDS_CLIENTS_process_get (uint32_t options,
       mmsg->hop_count = htonl(hop_count);
       mmsg->desired_replication_level = htonl(desired_replication_level);
       mmsg->get_path_length = htonl(path_length);
-      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+      memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
       if (path_length > 0)
         memcpy (msg_path, path,
@@ -1154,7 +1178,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
                               const struct GNUNET_PeerIdentity *put_path,
                               unsigned int put_path_length,
                               struct GNUNET_TIME_Absolute exp,
-                              const GNUNET_HashCode * key,
+                              const struct GNUNET_HashCode * key,
                               const void *data,
                               size_t size)
 {
@@ -1168,7 +1192,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
   {
     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
         (NULL == m->key ||
-         memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+         memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
     {
       struct PendingMessage *pm;
       struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
@@ -1208,7 +1232,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
         memcpy (path, get_path,
                 get_path_length * sizeof (struct GNUNET_PeerIdentity));
       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
-      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+      memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
       if (size > 0)
         memcpy (&path[get_path_length], data, size);
       add_pending_message (m->client, pm);
@@ -1241,7 +1265,7 @@ GDS_CLIENTS_process_put (uint32_t options,
                          unsigned int path_length,
                          const struct GNUNET_PeerIdentity *path,
                          struct GNUNET_TIME_Absolute exp,
-                         const GNUNET_HashCode * key,
+                         const struct GNUNET_HashCode * key,
                          const void *data,
                          size_t size)
 {
@@ -1255,7 +1279,7 @@ GDS_CLIENTS_process_put (uint32_t options,
   {
     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
         (NULL == m->key ||
-         memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+         memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
     {
       struct PendingMessage *pm;
       struct GNUNET_DHT_MonitorPutMessage *mmsg;
@@ -1292,7 +1316,7 @@ GDS_CLIENTS_process_put (uint32_t options,
                 path_length * sizeof (struct GNUNET_PeerIdentity));
       }
       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
-      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+      memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
       if (size > 0)
         memcpy (&msg_path[path_length], data, size);
       add_pending_message (m->client, pm);
@@ -1320,7 +1344,10 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
      sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
     {&handle_dht_local_monitor, NULL,
      GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
-     sizeof (struct GNUNET_DHT_MonitorStartMessage)},
+     sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
+    {&handle_dht_local_monitor_stop, NULL,
+     GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
+     sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
     {NULL, NULL, 0, 0}
   };
   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);