adding notification API to peerinfo
authorChristian Grothoff <christian@grothoff.org>
Mon, 11 Jan 2010 22:13:37 +0000 (22:13 +0000)
committerChristian Grothoff <christian@grothoff.org>
Mon, 11 Jan 2010 22:13:37 +0000 (22:13 +0000)
src/include/gnunet_peerinfo_service.h
src/include/gnunet_protocols.h
src/peerinfo/gnunet-service-peerinfo.c
src/peerinfo/peerinfo_api.c

index 54fc6803366b5e8bd7f7d8fec9116c48d47b0370..463131e6a81b7b3bcd97f4af4e816de6f2c20478 100644 (file)
@@ -99,6 +99,41 @@ GNUNET_PEERINFO_for_all (const struct GNUNET_CONFIGURATION_Handle *cfg,
                          void *callback_cls);
 
 
+/**
+ * Handle for notifications about changes to the set of known peers.
+ */
+struct GNUNET_PEERINFO_NotifyContext;
+
+
+/**
+ * Call a method whenever our known information about peers
+ * changes.  Initially calls the given function for all known
+ * peers and then only signals changes.  Note that it is
+ * possible (i.e. on disconnects) that the callback is called
+ * twice with the same peer information.
+ *
+ * @param cfg configuration to use
+ * @param sched scheduler to use
+ * @param callback the method to call for each peer
+ * @param callback_cls closure for callback
+ * @return NULL on error
+ */
+struct GNUNET_PEERINFO_NotifyContext *
+GNUNET_PEERINFO_notify (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                       struct GNUNET_SCHEDULER_Handle *sched,
+                       GNUNET_PEERINFO_Processor callback,
+                       void *callback_cls);
+
+
+/**
+ * Stop notifying about changes.
+ *
+ * @param nc context to stop notifying
+ */
+void
+GNUNET_PEERINFO_notify_cancel (struct GNUNET_PEERINFO_NotifyContext *nc);
+
+
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
 #endif
index 2fd1c26eb5bc759725d10779ac5db163923df2d1..fc73a57c367f17f1f8a63ffd3318a4783797b912 100644 (file)
@@ -211,6 +211,12 @@ extern "C"
  */
 #define GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END 36
 
+/**
+ * Start notifying this client about all changes to
+ * the known peers until it disconnects.
+ */
+#define GNUNET_MESSAGE_TYPE_PEERINFO_NOTIFY 37
+
 
 /**
  * Message by which a TCP transport notifies
index ee12878310784a6b45427c0ff0d115bc44c47cca..4323be478f267fffe89fbf574db12f1fdb02f33f 100644 (file)
@@ -26,6 +26,9 @@
  * structure of data/hosts/ and data/credit/).
  *
  * @author Christian Grothoff
+ *
+ * TODO:
+ * - HostEntries are never 'free'd (add expiration, upper bound?)
  */
 
 #include "platform.h"
@@ -84,11 +87,63 @@ struct HostEntry
 
 };
 
+
+/**
+ * Entries that we still need to tell the client about.
+ */
+struct PendingEntry
+{
+
+  /**
+   * This is a linked list.
+   */
+  struct PendingEntry *next;
+
+  /**
+   * Entry to tell the client about.
+   */
+  struct HostEntry *he;
+};
+
+
+/**
+ * Clients to notify of changes to the peer information.
+ */
+struct NotifyList
+{
+
+  /**
+   * This is a linked list.
+   */
+  struct NotifyList *next;
+
+  /**
+   * Client to notify.
+   */ 
+  struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Notifications pending for this entry.
+   */
+  struct PendingEntry *pending;
+
+  /**
+   * Handle for a transmit ready request.
+   */
+  struct GNUNET_CONNECTION_TransmitHandle *transmit_ctx;
+};
+
+
 /**
  * The in-memory list of known hosts.
  */
 static struct HostEntry *hosts;
 
+/**
+ * Clients to immediately notify about all changes.
+ */
+static struct NotifyList *notify_list;
+
 /**
  * Directory where the hellos are stored in (data/hosts)
  */
@@ -100,6 +155,116 @@ static char *networkIdDirectory;
 static char *trustDirectory;
 
 
+/**
+ * Transmit peer information messages from the pending queue
+ * to the client.
+ *
+ * @param cls the 'struct NotifyList' that we are processing
+ * @param size number of bytes we can transmit
+ * @param vbuf where to write the messages
+ * @return number of bytes written to vbuf
+ */
+static size_t
+transmit_pending_notification (void *cls,
+                              size_t size,
+                              void *vbuf)
+{
+  struct NotifyList *nl = cls;
+  char *buf = vbuf;
+  struct PendingEntry *pos;
+  struct PendingEntry *next;
+  struct InfoMessage im;
+  uint16_t hs;
+  size_t left;
+
+  nl->transmit_ctx = NULL;
+  next = nl->pending;
+  pos = nl->pending;
+  left = size;
+  while ( (pos != NULL) &&
+         (left >= sizeof (struct InfoMessage) + (hs = GNUNET_HELLO_size (pos->he->hello))) )
+    {
+      next = pos->next;
+      im.header.size = htons (hs + sizeof (struct InfoMessage));
+      im.header.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_INFO);
+      im.trust = htonl (pos->he->trust);
+      im.peer = pos->he->identity;
+      memcpy (&buf[size - left], &im, sizeof (struct InfoMessage));      
+      memcpy (&buf[size - left + sizeof (struct InfoMessage)], pos->he->hello, hs);
+      left -= hs + sizeof (struct InfoMessage);
+      GNUNET_free (pos);
+      pos = next;      
+    }
+  nl->pending = next;
+  if (nl->pending != NULL)
+    {
+      nl->transmit_ctx 
+       = GNUNET_SERVER_notify_transmit_ready (nl->client,
+                                              sizeof (struct InfoMessage) + hs,
+                                              GNUNET_TIME_UNIT_FOREVER_REL,
+                                              &transmit_pending_notification,
+                                              nl);
+    }
+  return size - left;
+}
+
+
+
+/**
+ * Notify client about host change.  Checks if the
+ * respective host entry is already in the list of things
+ * to send to the client, and if not, adds it.  Also
+ * triggers a new request for transmission if the pending
+ * list was previously empty.
+ *
+ * @param nl client to notify
+ * @param hc entry to notify about
+ */
+static void
+do_notify (struct NotifyList *nl,
+          struct HostEntry *he)
+{
+  struct PendingEntry *pe;
+
+  pe = nl->pending;
+  while (NULL != pe)
+    {
+      if (pe->he == he)
+       return; /* already in list */
+      pe = pe->next;
+    }
+  pe = GNUNET_malloc (sizeof (struct PendingEntry));
+  pe->next = nl->pending;
+  pe->he = he;
+  nl->pending = pe;
+  if (nl->transmit_ctx != NULL)
+    return; /* already trying to transmit */
+  nl->transmit_ctx = GNUNET_SERVER_notify_transmit_ready (nl->client,
+                                                         sizeof (struct InfoMessage) + GNUNET_HELLO_size (he->hello),
+                                                         GNUNET_TIME_UNIT_FOREVER_REL,
+                                                         &transmit_pending_notification,
+                                                         nl);
+}
+
+
+/**
+ * Notify all clients in the notify list about the
+ * given host entry changing.
+ */
+static void
+notify_all (struct HostEntry *he)
+{
+  struct NotifyList *nl;
+
+  nl = notify_list;
+  while (NULL != nl)
+    {
+      do_notify (nl, he);
+      nl = nl->next;
+    }
+}
+
+
 /**
  * Address iterator that causes expired entries to be discarded.
  *
@@ -231,6 +396,7 @@ add_host_to_known_hosts (const struct GNUNET_PeerIdentity *identity)
   GNUNET_free (fn);
   entry->next = hosts;
   hosts = entry;
+  notify_all (entry);
 }
 
 
@@ -246,6 +412,7 @@ static int
 change_host_trust (const struct GNUNET_PeerIdentity *hostId, int value)
 {
   struct HostEntry *host;
+  unsigned int old_trust;
 
   if (value == 0)
     return 0;
@@ -256,6 +423,7 @@ change_host_trust (const struct GNUNET_PeerIdentity *hostId, int value)
       host = lookup_host_entry (hostId);
     }
   GNUNET_assert (host != NULL);
+  old_trust = host->trust;
   if (value > 0)
     {
       if (host->trust + value < host->trust)
@@ -276,6 +444,8 @@ change_host_trust (const struct GNUNET_PeerIdentity *hostId, int value)
       else
         host->trust += value;
     }
+  if (host->trust != old_trust)
+    notify_all (host);
   return value;
 }
 
@@ -383,6 +553,8 @@ bind_address (const struct GNUNET_PeerIdentity *peer,
   else
     {
       mrg = GNUNET_HELLO_merge (host->hello, hello);
+      /* FIXME: check if old and merged hello are equal,
+        and if so, bail out early... */
       GNUNET_free (host->hello);
       host->hello = mrg;
     }
@@ -393,6 +565,7 @@ bind_address (const struct GNUNET_PeerIdentity *peer,
                        GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
                        | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ);
   GNUNET_free (fn);
+  notify_all (host);
 }
 
 
@@ -642,6 +815,35 @@ handle_get_all (void *cls,
 }
 
 
+/**
+ * Handle NOTIFY-message.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ */
+static void
+handle_notify (void *cls,
+            struct GNUNET_SERVER_Client *client,
+            const struct GNUNET_MessageHeader *message)
+{
+  struct NotifyList *nl;
+  struct HostEntry *pos;
+
+  nl = GNUNET_malloc (sizeof (struct NotifyList));
+  nl->next = notify_list;
+  nl->client = client;
+  GNUNET_SERVER_client_keep (client);  
+  notify_list = nl;
+  pos = hosts;
+  while (NULL != pos)
+    {
+      do_notify (nl, pos);
+      pos = pos->next;
+    }
+}
+
+
 /**
  * List of handlers for the messages understood by this
  * service.
@@ -652,10 +854,57 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
    sizeof (struct ListPeerMessage)},
   {&handle_get_all, NULL, GNUNET_MESSAGE_TYPE_PEERINFO_GET_ALL,
    sizeof (struct ListAllPeersMessage)},
+  {&handle_notify, NULL, GNUNET_MESSAGE_TYPE_PEERINFO_NOTIFY,
+   sizeof (struct GNUNET_MessageHeader)},
   {NULL, NULL, 0, 0}
 };
 
 
+/**
+ * Function that is called when a client disconnects.
+ */
+static void
+notify_disconnect (void *cls,
+                  struct GNUNET_SERVER_Client *client)
+{
+  struct NotifyList *pos;
+  struct NotifyList *prev;
+  struct NotifyList *next;
+  struct PendingEntry *p;
+
+  pos = notify_list;
+  prev = NULL;
+  while (pos != NULL)
+    {
+      next = pos->next;
+      if (pos->client == client)
+       {
+         while (NULL != (p = pos->pending))
+           {
+             pos->pending = p->next;
+             GNUNET_free (p);
+           }
+         if (pos->transmit_ctx != NULL)
+           {
+             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_ctx);
+             pos->transmit_ctx = NULL;
+           }
+         if (prev == NULL)
+           notify_list = next;
+         else
+           prev->next = next;
+          GNUNET_SERVER_client_drop (client);
+         GNUNET_free (pos);
+       }
+      else
+       {
+         prev = pos;
+       }
+      pos = next;
+    }
+
+}
+
 
 /**
  * Process statistics requests.
@@ -692,6 +941,7 @@ run (void *cls,
   GNUNET_SCHEDULER_add_with_priority (sched,
                                      GNUNET_SCHEDULER_PRIORITY_IDLE,
                                      &cron_clean_data_hosts, NULL);
+  GNUNET_SERVER_disconnect_notify (server, &notify_disconnect, NULL);
   GNUNET_SERVER_add_handlers (server, handlers);
 }
 
index b5c6c99d71680bff9c19a5dc912ad6cb0c1a7efd..906c01d77287fa5d9fe5cd2c9f16a7d2eadad61e 100644 (file)
@@ -298,4 +298,247 @@ GNUNET_PEERINFO_for_all (const struct GNUNET_CONFIGURATION_Handle *cfg,
     }
 }
 
+
+
+/**
+ * Context for the info handler.
+ */
+struct GNUNET_PEERINFO_NotifyContext
+{
+
+  /**
+   * Our connection to the PEERINFO service.
+   */
+  struct GNUNET_CLIENT_Connection *client;
+
+  /**
+   * Function to call with information.
+   */
+  GNUNET_PEERINFO_Processor callback;
+
+  /**
+   * Closure for callback.
+   */
+  void *callback_cls;
+
+  /**
+   * Handle to our initial request for message transmission to
+   * the peerinfo service.
+   */
+  struct GNUNET_CLIENT_TransmitHandle *init;
+
+  /**
+   * Configuration.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+  /**
+   * Scheduler.
+   */
+  struct GNUNET_SCHEDULER_Handle *sched;
+};
+
+
+/**
+ * Send a request to the peerinfo service to start being
+ * notified about all changes to peer information.
+ *
+ * @param nc our context
+ */
+static void
+request_notifications (struct GNUNET_PEERINFO_NotifyContext *nc);
+
+
+/**
+ * Read notifications from the client handle and pass them
+ * to the callback.
+ *
+ * @param nc our context
+ */
+static void
+receive_notifications (struct GNUNET_PEERINFO_NotifyContext *nc);
+
+
+/**
+ * Receive a peerinfo information message, process it and
+ * go for more.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+process_notification (void *cls,
+                     const struct
+                     GNUNET_MessageHeader * msg)
+{
+  struct GNUNET_PEERINFO_NotifyContext *nc = cls;
+  const struct InfoMessage *im;
+  const struct GNUNET_HELLO_Message *hello;
+  uint16_t ms;
+
+  if (msg == NULL)
+    {
+      GNUNET_CLIENT_disconnect (nc->client);
+      nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+      request_notifications (nc);
+      return;
+    }
+  ms = ntohs (msg->size);
+  if ((ms < sizeof (struct InfoMessage)) ||
+      (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_PEERINFO_INFO))
+    {
+      GNUNET_break (0);
+      GNUNET_CLIENT_disconnect (nc->client);
+      nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+      request_notifications (nc);
+      return;
+    }
+  im = (const struct InfoMessage *) msg;
+  hello = NULL;
+  if (ms > sizeof (struct InfoMessage) + sizeof (struct GNUNET_MessageHeader))
+    {
+      hello = (const struct GNUNET_HELLO_Message *) &im[1];
+      if (ms != sizeof (struct InfoMessage) + GNUNET_HELLO_size (hello))
+        {
+          GNUNET_break (0);
+         GNUNET_CLIENT_disconnect (nc->client);
+         nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+         request_notifications (nc);
+          return;
+        }
+    }
+#if DEBUG_PEERINFO
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received information about peer `%s' from peerinfo database\n",
+             GNUNET_i2s (&im->peer));
+#endif
+  nc->callback (nc->callback_cls, &im->peer, hello, ntohl (im->trust));
+  receive_notifications (nc);
+}
+
+
+/**
+ * Read notifications from the client handle and pass them
+ * to the callback.
+ *
+ * @param nc our context
+ */
+static void
+receive_notifications (struct GNUNET_PEERINFO_NotifyContext *nc)
+{
+  GNUNET_CLIENT_receive (nc->client,
+                        &process_notification,
+                        nc,
+                        GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Transmit our init-notify request, start receiving.
+ *
+ * @param cls closure (our 'struct GNUNET_PEERINFO_NotifyContext')
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t 
+transmit_notify_request (void *cls,
+                        size_t size, 
+                        void *buf)
+{
+  struct GNUNET_PEERINFO_NotifyContext *nc = cls;
+  struct GNUNET_MessageHeader hdr;
+
+  nc->init = NULL;
+  if (buf == NULL)
+    {
+      GNUNET_CLIENT_disconnect (nc->client);
+      nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+      request_notifications (nc);
+      return 0;
+    }
+  GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
+  hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
+  hdr.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_NOTIFY);
+  memcpy (buf, &hdr, sizeof (struct GNUNET_MessageHeader));
+  receive_notifications (nc);
+  return sizeof (struct GNUNET_MessageHeader);
+}
+
+
+/**
+ * Send a request to the peerinfo service to start being
+ * notified about all changes to peer information.
+ *
+ * @param nc our context
+ */
+static void
+request_notifications (struct GNUNET_PEERINFO_NotifyContext *nc)
+{
+  GNUNET_assert (NULL == nc->init);
+  nc->init =GNUNET_CLIENT_notify_transmit_ready (nc->client,
+                                                sizeof (struct GNUNET_MessageHeader),
+                                                GNUNET_TIME_UNIT_FOREVER_REL,
+                                                GNUNET_YES,
+                                                &transmit_notify_request,
+                                                nc);
+}
+
+
+/**
+ * Call a method whenever our known information about peers
+ * changes.  Initially calls the given function for all known
+ * peers and then only signals changes.
+ *
+ * @param cfg configuration to use
+ * @param sched scheduler to use
+ * @param callback the method to call for each peer
+ * @param callback_cls closure for callback
+ * @return NULL on error
+ */
+struct GNUNET_PEERINFO_NotifyContext *
+GNUNET_PEERINFO_notify (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                       struct GNUNET_SCHEDULER_Handle *sched,
+                       GNUNET_PEERINFO_Processor callback,
+                       void *callback_cls)
+{
+  struct GNUNET_PEERINFO_NotifyContext *nc;
+  struct GNUNET_CLIENT_Connection *client;
+
+  client = GNUNET_CLIENT_connect (sched, "peerinfo", cfg);
+  if (client == NULL)
+    {      
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  _("Could not connect to `%s' service.\n"), "peerinfo");
+      return NULL;
+    }
+  nc = GNUNET_malloc (sizeof (struct GNUNET_PEERINFO_NotifyContext));
+  nc->sched = sched;
+  nc->cfg = cfg;
+  nc->client = client;
+  nc->callback = callback;
+  nc->callback_cls = callback_cls; 
+  request_notifications (nc);
+  return nc;
+}
+
+
+/**
+ * Stop notifying about changes.
+ *
+ * @param nc context to stop notifying
+ */
+void
+GNUNET_PEERINFO_notify_cancel (struct GNUNET_PEERINFO_NotifyContext *nc)
+{
+  if (NULL != nc->init)
+    {
+      GNUNET_CLIENT_notify_transmit_ready_cancel (nc->init);
+      nc->init = NULL;
+    }
+  GNUNET_CLIENT_disconnect (nc->client);
+  GNUNET_free (nc);
+}
+
+
 /* end of peerinfo_api.c */