-doxygen
[oweals/gnunet.git] / src / dv / dv_api.c
index 3f9f1142360a38788ae80db37c1060f78ceddbfe..7ea00599501ebeb25c6c05c97dd5700f0fdd85fa 100644 (file)
 #define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__)
 
 
+/**
+ * Information we track for each peer.
+ */
+struct ConnectedPeer;
+
+
 /**
  * Handle for a send operation.
  */
@@ -60,10 +66,10 @@ struct GNUNET_DV_TransmitHandle
   GNUNET_DV_MessageSentCallback cb;
 
   /**
-   * Closure for 'cb'.
+   * Closure for @a cb.
    */
   void *cb_cls;
-  
+
   /**
    * The actual message (allocated at the end of this struct).
    */
@@ -72,7 +78,42 @@ struct GNUNET_DV_TransmitHandle
   /**
    * Destination for the message.
    */
-  struct GNUNET_PeerIdentity target;
+  struct ConnectedPeer *target;
+
+  /**
+   * UID of our message, if any.
+   */
+  uint32_t uid;
+
+};
+
+
+/**
+ * Information we track for each peer.
+ */
+struct ConnectedPeer
+{
+
+  /**
+   * Identity of the peer.
+   */
+  struct GNUNET_PeerIdentity pid;
+
+  /**
+   * Head of DLL of transmission handles where we need
+   * to invoke a continuation when we are informed about
+   * successful transmission.  The respective request
+   * has already been sent to the DV service.
+   */
+  struct GNUNET_DV_TransmitHandle *head;
+
+  /**
+   * Tail of DLL of transmission handles where we need
+   * to invoke a continuation when we are informed about
+   * successful transmission.  The respective request
+   * has already been sent to the DV service.
+   */
+  struct GNUNET_DV_TransmitHandle *tail;
 
 };
 
@@ -102,12 +143,17 @@ struct GNUNET_DV_ServiceHandle
    * Closure for the callbacks.
    */
   void *cls;
-  
+
   /**
    * Function to call on connect events.
    */
   GNUNET_DV_ConnectCallback connect_cb;
 
+  /**
+   * Function to call on distance change events.
+   */
+  GNUNET_DV_DistanceChangedCallback distance_cb;
+
   /**
    * Function to call on disconnect events.
    */
@@ -129,11 +175,10 @@ struct GNUNET_DV_ServiceHandle
   struct GNUNET_DV_TransmitHandle *th_tail;
 
   /**
-   * Mapping of peer identities to TransmitHandles to invoke
-   * upon successful transmission.  The respective
-   * transmissions have already been done.
+   * Information tracked per connected peer.  Maps peer
+   * identities to `struct ConnectedPeer` entries.
    */
-  struct GNUNET_CONTAINER_MultiHashMap *send_callbacks;
+  struct GNUNET_CONTAINER_MultiPeerMap *peers;
 
   /**
    * Current unique ID
@@ -158,12 +203,14 @@ reconnect (struct GNUNET_DV_ServiceHandle *sh);
  * @param cls handle to the dv service (struct GNUNET_DV_ServiceHandle)
  * @param size how many bytes can we send
  * @param buf where to copy the message to send
- * @return how many bytes we copied to buf
+ * @return how many bytes we copied to @a buf
  */
 static size_t
 transmit_pending (void *cls, size_t size, void *buf)
 {
   struct GNUNET_DV_ServiceHandle *sh = cls;
+  char *cbuf = buf;
+  struct GNUNET_DV_TransmitHandle *th;
   size_t ret;
   size_t tsize;
 
@@ -174,14 +221,24 @@ transmit_pending (void *cls, size_t size, void *buf)
     return 0;
   }
   ret = 0;
-  // FIXME: yuck! -- copy multiple, remove from DLL, and add to hash map!
-  if (NULL != sh->th_head)
+  while ( (NULL != (th = sh->th_head)) &&
+         (size - ret >= (tsize = ntohs (th->msg->size)) ))
   {
-    tsize = ntohs (sh->th_head->msg->size);
-    if (size >= tsize)
+    GNUNET_CONTAINER_DLL_remove (sh->th_head,
+                                sh->th_tail,
+                                th);
+    memcpy (&cbuf[ret], th->msg, tsize);
+    th->msg = NULL;
+    ret += tsize;
+    if (NULL != th->cb)
     {
-      memcpy (buf, sh->th_head->msg, tsize);
-      return tsize;
+      GNUNET_CONTAINER_DLL_insert (th->target->head,
+                                   th->target->tail,
+                                   th);
+    }
+    else
+    {
+      GNUNET_free (th);
     }
   }
   return ret;
@@ -199,16 +256,50 @@ start_transmit (struct GNUNET_DV_ServiceHandle *sh)
   if (NULL != sh->th)
     return;
   if (NULL == sh->th_head)
-    return; 
+    return;
   sh->th =
     GNUNET_CLIENT_notify_transmit_ready (sh->client,
                                         ntohs (sh->th_head->msg->size),
                                         GNUNET_TIME_UNIT_FOREVER_REL,
-                                        GNUNET_NO, 
+                                        GNUNET_NO,
                                         &transmit_pending, sh);
 }
 
 
+/**
+ * We got disconnected from the service and thus all of the
+ * pending send callbacks will never be confirmed.  Clean up.
+ *
+ * @param cls the 'struct GNUNET_DV_ServiceHandle'
+ * @param key a peer identity
+ * @param value a `struct ConnectedPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+cleanup_send_cb (void *cls,
+                const struct GNUNET_PeerIdentity *key,
+                void *value)
+{
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  struct ConnectedPeer *peer = value;
+  struct GNUNET_DV_TransmitHandle *th;
+
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (sh->peers,
+                                                      key,
+                                                      peer));
+  sh->disconnect_cb (sh->cls,
+                     key);
+  while (NULL != (th = peer->head))
+  {
+    GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th);
+    th->cb (th->cb_cls, GNUNET_SYSERR);
+    GNUNET_free (th);
+  }
+  return GNUNET_OK;
+}
+
+
 /**
  * Handles a message sent from the DV service to us.
  * Parse it out and give it to the plugin.
@@ -217,13 +308,19 @@ start_transmit (struct GNUNET_DV_ServiceHandle *sh)
  * @param msg the message that was received
  */
 static void
-handle_message_receipt (void *cls, 
+handle_message_receipt (void *cls,
                        const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_DV_ServiceHandle *sh = cls;
   const struct GNUNET_DV_ConnectMessage *cm;
+  const struct GNUNET_DV_DistanceUpdateMessage *dum;
   const struct GNUNET_DV_DisconnectMessage *dm;
   const struct GNUNET_DV_ReceivedMessage *rm;
+  const struct GNUNET_MessageHeader *payload;
+  const struct GNUNET_DV_AckMessage *ack;
+  struct GNUNET_DV_TransmitHandle *th;
+  struct GNUNET_DV_TransmitHandle *tn;
+  struct ConnectedPeer *peer;
 
   if (NULL == msg)
   {
@@ -231,6 +328,9 @@ handle_message_receipt (void *cls,
     reconnect (sh);
     return;
   }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received message of type %u from DV service\n",
+       (unsigned int) msg->type);
   switch (ntohs (msg->type))
   {
   case GNUNET_MESSAGE_TYPE_DV_CONNECT:
@@ -241,7 +341,36 @@ handle_message_receipt (void *cls,
       return;
     }
     cm = (const struct GNUNET_DV_ConnectMessage *) msg;
-    // FIXME
+    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                              &cm->peer);
+    if (NULL != peer)
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    peer = GNUNET_new (struct ConnectedPeer);
+    peer->pid = cm->peer;
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multipeermap_put (sh->peers,
+                                                      &peer->pid,
+                                                      peer,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    sh->connect_cb (sh->cls,
+                   &cm->peer,
+                   ntohl (cm->distance), ntohl (cm->network));
+    break;
+  case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
+    sh->distance_cb (sh->cls,
+                    &dum->peer,
+                    ntohl (dum->distance));
     break;
   case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
     if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
@@ -251,23 +380,85 @@ handle_message_receipt (void *cls,
       return;
     }
     dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
-    // FIXME
+    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                              &dm->peer);
+    if (NULL == peer)
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    tn = sh->th_head;
+    while (NULL != (th = tn))
+    {
+      tn = th->next;
+      if (peer == th->target)
+      {
+        GNUNET_CONTAINER_DLL_remove (sh->th_head,
+                                     sh->th_tail,
+                                     th);
+        th->cb (th->cb_cls, GNUNET_SYSERR);
+        GNUNET_free (th);
+      }
+    }
+    cleanup_send_cb (sh, &dm->peer, peer);
     break;
   case GNUNET_MESSAGE_TYPE_DV_RECV:
-    if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage))
+    if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader))
     {
       GNUNET_break (0);
       reconnect (sh);
       return;
     }
     rm = (const struct GNUNET_DV_ReceivedMessage *) msg;
-    // FIXME
+    payload = (const struct GNUNET_MessageHeader *) &rm[1];
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    sh->message_cb (sh->cls,
+                   &rm->sender,
+                   ntohl (rm->distance),
+                   payload);
+    break;
+  case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
+  case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    ack = (const struct GNUNET_DV_AckMessage *) msg;
+    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                              &ack->target);
+    if (NULL == peer)
+      return; /* this happens, just ignore */
+    for (th = peer->head; NULL != th; th = th->next)
+    {
+      if (th->uid != ntohl (ack->uid))
+        continue;
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Matched ACK for message to peer %s\n",
+           GNUNET_i2s (&ack->target));
+      GNUNET_CONTAINER_DLL_remove (peer->head,
+                                   peer->tail,
+                                   th);
+      th->cb (th->cb_cls,
+              (ntohs (ack->header.type) == GNUNET_MESSAGE_TYPE_DV_SEND_ACK)
+              ? GNUNET_OK
+              : GNUNET_SYSERR);
+      GNUNET_free (th);
+      break;
+    }
     break;
   default:
     reconnect (sh);
     break;
   }
-  GNUNET_CLIENT_receive (sh->client, 
+  GNUNET_CLIENT_receive (sh->client,
                         &handle_message_receipt, sh,
                          GNUNET_TIME_UNIT_FOREVER_REL);
 }
@@ -276,11 +467,11 @@ handle_message_receipt (void *cls,
 /**
  * Transmit the start message to the DV service.
  *
- * @param cls the 'struct GNUNET_DV_ServiceHandle'
+ * @param cls the `struct GNUNET_DV_ServiceHandle *`
  * @param size number of bytes available in buf
  * @param buf where to copy the message
  * @return number of bytes written to buf
- */ 
+ */
 static size_t
 transmit_start (void *cls,
                size_t size,
@@ -326,6 +517,11 @@ reconnect (struct GNUNET_DV_ServiceHandle *sh)
     GNUNET_CLIENT_disconnect (sh->client);
     sh->client = NULL;
   }
+  GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
+                                        &cleanup_send_cb,
+                                        sh);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Connecting to DV service\n");
   sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg);
   if (NULL == sh->client)
   {
@@ -347,6 +543,7 @@ reconnect (struct GNUNET_DV_ServiceHandle *sh)
  * @param cfg configuration
  * @param cls closure for callbacks
  * @param connect_cb function to call on connects
+ * @param distance_cb function to call if distances change
  * @param disconnect_cb function to call on disconnects
  * @param message_cb function to call if we receive messages
  * @return handle to access the service
@@ -355,18 +552,20 @@ struct GNUNET_DV_ServiceHandle *
 GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
                           void *cls,
                           GNUNET_DV_ConnectCallback connect_cb,
+                          GNUNET_DV_DistanceChangedCallback distance_cb,
                           GNUNET_DV_DisconnectCallback disconnect_cb,
                           GNUNET_DV_MessageReceivedCallback message_cb)
 {
   struct GNUNET_DV_ServiceHandle *sh;
 
-  sh = GNUNET_malloc (sizeof (struct GNUNET_DV_ServiceHandle));
+  sh = GNUNET_new (struct GNUNET_DV_ServiceHandle);
   sh->cfg = cfg;
   sh->cls = cls;
   sh->connect_cb = connect_cb;
+  sh->distance_cb = distance_cb;
   sh->disconnect_cb = disconnect_cb;
   sh->message_cb = message_cb;
-  sh->send_callbacks = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_YES);
+  sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
   reconnect (sh);
   return sh;
 }
@@ -381,7 +580,7 @@ void
 GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
 {
   struct GNUNET_DV_TransmitHandle *pos;
-  
+
   if (NULL == sh)
     return;
   if (NULL != sh->th)
@@ -396,13 +595,15 @@ GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
                                 pos);
     GNUNET_free (pos);
   }
-  if (NULL != sh->client) 
+  if (NULL != sh->client)
   {
     GNUNET_CLIENT_disconnect (sh->client);
     sh->client = NULL;
   }
-  // FIXME: handle and/or free entries in 'send_callbacks'!
-  GNUNET_CONTAINER_multihashmap_destroy (sh->send_callbacks);
+  GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
+                                        &cleanup_send_cb,
+                                        sh);
+  GNUNET_CONTAINER_multipeermap_destroy (sh->peers);
   GNUNET_free (sh);
 }
 
@@ -414,7 +615,7 @@ GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
  * @param target intended recpient
  * @param msg message payload
  * @param cb function to invoke when done
- * @param cb_cls closure for 'cb'
+ * @param cb_cls closure for @a cb
  * @return handle to cancel the operation
  */
 struct GNUNET_DV_TransmitHandle *
@@ -425,16 +626,45 @@ GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
                void *cb_cls)
 {
   struct GNUNET_DV_TransmitHandle *th;
+  struct GNUNET_DV_SendMessage *sm;
+  struct ConnectedPeer *peer;
 
+  if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to send %u bytes of type %u to %s\n",
+       (unsigned int) msg->size,
+       (unsigned int) msg->type,
+       GNUNET_i2s (target));
+  peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                            target);
+  if (NULL == peer)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
   th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) +
+                     sizeof (struct GNUNET_DV_SendMessage) +
                      ntohs (msg->size));
   th->sh = sh;
-  th->target = *target;
+  th->target = peer;
   th->cb = cb;
   th->cb_cls = cb_cls;
-  // FIXME: wrong, need to box 'msg' AND generate UID!
   th->msg = (const struct GNUNET_MessageHeader *) &th[1];
-  memcpy (&th[1], msg, ntohs (msg->size));
+  sm = (struct GNUNET_DV_SendMessage *) &th[1];
+  sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
+  sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
+                          ntohs (msg->size));
+  if (0 == sh->uid_gen)
+    sh->uid_gen = 1;
+  th->uid = sh->uid_gen;
+  sm->uid = htonl (sh->uid_gen++);
+  /* use memcpy here as 'target' may not be sufficiently aligned */
+  memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
+  memcpy (&sm[1], msg, ntohs (msg->size));
   GNUNET_CONTAINER_DLL_insert (sh->th_head,
                               sh->th_tail,
                               th);
@@ -454,12 +684,12 @@ void
 GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
 {
   struct GNUNET_DV_ServiceHandle *sh = th->sh;
-  int ret;
 
-  ret = GNUNET_CONTAINER_multihashmap_remove (sh->send_callbacks,
-                                             &th->target.hashPubKey,
-                                             th);
-  if (GNUNET_YES != ret)
+  if (NULL == th->msg)
+    GNUNET_CONTAINER_DLL_remove (th->target->head,
+                                th->target->tail,
+                                th);
+  else
     GNUNET_CONTAINER_DLL_remove (sh->th_head,
                                 sh->th_tail,
                                 th);