-add missing comments, expand error checking
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
index dde8c6d7a03a30dbe83b3e24d30e6625372114af..a5ac0c32b1ac00ac6be087666949c3352404173c 100644 (file)
@@ -177,6 +177,39 @@ struct ClientQueryRecord
 };
 
 
+/**
+ * Struct containing paremeters of monitoring requests.
+ */
+struct ClientMonitorRecord
+{
+
+  /**
+   * Next element in DLL.
+   */
+  struct ClientMonitorRecord    *next;
+
+  /**
+   * Previous element in DLL.
+   */
+  struct ClientMonitorRecord    *prev;
+  
+  /**
+   * Type of blocks that are of interest
+   */
+  enum GNUNET_BLOCK_Type        type;
+
+  /**
+   * Key of data of interest, NULL for all.
+   */
+  GNUNET_HashCode         *key;
+
+  /**
+   * Client to notify of these requests.
+   */
+  struct ClientList             *client;
+};
+
+
 /**
  * List of active clients.
  */
@@ -187,6 +220,16 @@ static struct ClientList *client_head;
  */
 static struct ClientList *client_tail;
 
+/**
+ * List of active monitoring requests.
+ */
+static struct ClientMonitorRecord *monitor_head;
+
+/**
+ * List of active monitoring requests..
+ */
+static struct ClientMonitorRecord *monitor_tail;
+
 /**
  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
  */
@@ -275,6 +318,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
 {
   struct ClientList *pos;
   struct PendingMessage *reply;
+  struct ClientMonitorRecord *monitor;
 
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client);
@@ -288,6 +332,22 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
     GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply);
     GNUNET_free (reply);
   }
+  monitor = monitor_head;
+  while (NULL != monitor)
+  {
+    if (monitor->client == pos)
+    {
+      struct ClientMonitorRecord *next;
+      
+      GNUNET_free_non_null (monitor->key);
+      next = monitor->next;
+      GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, monitor);
+      GNUNET_free (monitor);
+      monitor = next;
+    }
+    else
+      monitor = monitor->next;
+  }
   GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records,
                                          pos);
   GNUNET_free (pos);
@@ -575,6 +635,43 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
 }
 
 
+/**
+ * Handler for monitor messages
+ *
+ * @param cls closure for the service
+ * @param client the client we received this message from
+ * @param message the actual message received
+ *
+ */
+static void
+handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
+                          const struct GNUNET_MessageHeader *message)
+{
+  struct ClientMonitorRecord *r;
+  const struct GNUNET_DHT_MonitorMessage *msg;
+  unsigned int i;
+  char *c;
+
+  msg = (struct GNUNET_DHT_MonitorMessage *) message;
+  r = GNUNET_malloc (sizeof(struct ClientMonitorRecord));
+
+  r->client = find_active_client(client);
+  r->type = ntohl(msg->type);
+  c = (char *) &msg->key;
+  for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++);
+  if (sizeof (GNUNET_HashCode) == i)
+    r->key = NULL;
+  else
+  {
+    r->key = GNUNET_malloc (sizeof (GNUNET_HashCode));
+    memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode));
+  }
+  GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r);
+  // FIXME add remove somewhere
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
 /**
  * Task run to check for messages that need to be sent to a client.
  *
@@ -929,6 +1026,91 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
 }
 
 
+/**
+ * Check if some client is monitoring messages of this type and notify
+ * him in that case.
+ *
+ * @param mtype Type of the DHT message.
+ * @param exp When will this value expire.
+ * @param key Key of the result/request.
+ * @param get_path Peers on reply path (or NULL if not recorded).
+ * @param get_path_length number of entries in get_path.
+ * @param put_path peers on the PUT path (or NULL if not recorded).
+ * @param put_path_length number of entries in get_path.
+ * @param desired_replication_level Desired replication level.
+ * @param type Type of the result/request.
+ * @param data Pointer to the result data.
+ * @param size Number of bytes in data.
+ */
+void
+GDS_CLIENTS_process_monitor (uint16_t mtype,
+                             const struct GNUNET_TIME_Absolute exp,
+                             const GNUNET_HashCode *key,
+                             uint32_t putl,
+                             const struct GNUNET_PeerIdentity *put_path,
+                             uint32_t getl,
+                             const struct GNUNET_PeerIdentity *get_path,
+                             uint32_t replevel,
+                             enum GNUNET_BLOCK_Type type,
+                             const struct GNUNET_MessageHeader *data,
+                             uint16_t size)
+{
+  struct ClientMonitorRecord *m;
+  struct ClientList **cl;
+  unsigned int cl_size;
+
+  cl = NULL;
+  cl_size = 0;
+  for (m = monitor_head; NULL != m; m = m->next)
+  {
+    if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
+        (NULL == m->key ||
+         memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+    {
+      struct PendingMessage *pm;
+      struct GNUNET_DHT_MonitorMessage *mmsg;
+      struct GNUNET_PeerIdentity *path;
+      size_t msize;
+      unsigned int i;
+
+      /* Don't send duplicates */
+      for (i = 0; i < cl_size; i++)
+        if (cl[i] == m->client)
+          break;
+      if (i < cl_size)
+        continue;
+      GNUNET_array_append (cl, cl_size, m->client);
+
+      msize = size;
+      msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity);
+      msize += sizeof (struct GNUNET_DHT_MonitorMessage);
+      msize += sizeof (struct PendingMessage);
+      pm = (struct PendingMessage *) GNUNET_malloc (msize);
+      mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1];
+      pm->msg = (struct GNUNET_MessageHeader *) mmsg;
+      mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
+      mmsg->header.type = htons (mtype);
+      mmsg->expiration = GNUNET_TIME_absolute_hton(exp);
+      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+      mmsg->put_path_length = htonl(putl);
+      mmsg->get_path_length = htonl(getl);
+      path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+      if (putl > 0)
+      {
+        memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity));
+        path = &path[putl];
+      }
+      if (getl > 0)
+        memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity));
+      if (size > 0)
+        memcpy (&path[getl], data, size);
+      add_pending_message (m->client, pm);
+    }
+  }
+  GNUNET_free_non_null (cl);
+}
+
+
 /**
  * Initialize client subsystem.
  *
@@ -945,6 +1127,9 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
     {&handle_dht_local_get_stop, NULL,
      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
      sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
+    {&handle_dht_local_monitor, NULL,
+     GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
+     sizeof (struct GNUNET_DHT_MonitorMessage)},
     {NULL, NULL, 0, 0}
   };
   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);