-ensure stats queues do not grow too big
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
index 9ebfa66dac2f0014ba4ef6fd97c1521d7b3488a5..94f9d2f88e8ce276430f830747feffae267cc7b8 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2009 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -157,17 +157,17 @@ struct Channel
   /**
    * Public key of the target group.
    */
-  struct GNUNET_CRYPTO_EddsaPublicKey group_key;
+  struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
 
   /**
-   * Hash of @a group_key.
+   * Hash of @a group_pub_key.
    */
-  struct GNUNET_HashCode group_key_hash;
+  struct GNUNET_HashCode group_pub_hash;
 
   /**
    * Public key of the joining member.
    */
-  struct GNUNET_CRYPTO_EcdsaPublicKey member_key;
+  struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
 
   /**
    * Remote peer identity.
@@ -180,6 +180,11 @@ struct Channel
    */
   int8_t join_status;
 
+  /**
+   * Number of messages waiting to be sent to CADET.
+   */
+  uint8_t msgs_pending;
+
   /**
    * Channel direction.
    * @see enum ChannelDirection
@@ -322,10 +327,9 @@ struct ReplayRequestKey
  * Task run during shutdown.
  *
  * @param cls unused
- * @param tc unused
  */
 static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls)
 {
   if (NULL != core)
   {
@@ -443,6 +447,7 @@ replay_req_remove_cadet (struct Channel *chn)
     if (c == chn)
     {
       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
+      GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
       return GNUNET_YES;
     }
   }
@@ -480,6 +485,7 @@ replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client
     if (c == client)
     {
       GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
+      GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
       return GNUNET_YES;
     }
   }
@@ -619,8 +625,10 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
 /**
  * Send message to all origin and member clients connected to the group.
  *
- * @param grp  The group to send @a msg to.
- * @param msg  Message to send.
+ * @param pub_key_hash
+ *        H(key_pub) of the group.
+ * @param msg
+ *        Message to send.
  */
 static int
 client_send_all (struct GNUNET_HashCode *pub_key_hash,
@@ -660,8 +668,10 @@ client_send_random (struct GNUNET_HashCode *pub_key_hash,
 /**
  * Send message to all origin clients connected to the group.
  *
- * @param grp  The group to send @a msg to.
- * @param msg  Message to send.
+ * @param pub_key_hash
+ *        H(key_pub) of the group.
+ * @param msg
+ *        Message to send.
  */
 static int
 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
@@ -675,6 +685,33 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash,
 }
 
 
+/**
+ * Send fragment acknowledgement to all clients of the channel.
+ *
+ * @param pub_key_hash
+ *        H(key_pub) of the group.
+ */
+static void
+client_send_ack (struct GNUNET_HashCode *pub_key_hash)
+{
+  static struct GNUNET_MessageHeader *msg = NULL;
+  if (NULL == msg)
+  {
+    msg = GNUNET_malloc (sizeof (*msg));
+    msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
+    msg->size = htons (sizeof (*msg));
+  }
+  client_send_all (pub_key_hash, msg);
+}
+
+
+struct CadetTransmitClosure
+{
+  struct Channel *chn;
+  const struct GNUNET_MessageHeader *msg;
+};
+
+
 /**
  * CADET is ready to transmit a message.
  */
@@ -686,10 +723,21 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
     /* FIXME: connection closed */
     return 0;
   }
-  const struct GNUNET_MessageHeader *msg = cls;
-  uint16_t msg_size = ntohs (msg->size);
+  struct CadetTransmitClosure *tcls = cls;
+  struct Channel *chn = tcls->chn;
+  uint16_t msg_size = ntohs (tcls->msg->size);
   GNUNET_assert (msg_size <= buf_size);
-  memcpy (buf, msg, msg_size);
+  memcpy (buf, tcls->msg, msg_size);
+  GNUNET_free (tcls);
+
+  if (0 == chn->msgs_pending)
+  {
+    GNUNET_break (0);
+  }
+  else if (0 == --chn->msgs_pending)
+  {
+    client_send_ack (&chn->group_pub_hash);
+  }
   return msg_size;
 }
 
@@ -703,12 +751,17 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
 static void
 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
 {
+  struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
+  tcls->chn = chn;
+  tcls->msg = msg;
+
+  chn->msgs_pending++;
   chn->tmit_handle
     = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
                                           GNUNET_TIME_UNIT_FOREVER_REL,
                                           ntohs (msg->size),
                                           &cadet_notify_transmit_ready,
-                                          (void *) msg);
+                                          tcls);
   GNUNET_assert (NULL != chn->tmit_handle);
 }
 
@@ -718,10 +771,10 @@ cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
  *
  * @param peer
  *        Peer to connect to.
- * @param group_key
+ * @param group_pub_key
  *        Public key of group the channel belongs to.
- * @param group_key_hash
- *        Hash of @a group_key.
+ * @param group_pub_hash
+ *        Hash of @a group_pub_key.
  *
  * @return Channel.
  */
@@ -730,15 +783,15 @@ cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
 {
   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
   chn->grp = grp;
-  chn->group_key = grp->pub_key;
-  chn->group_key_hash = grp->pub_key_hash;
+  chn->group_pub_key = grp->pub_key;
+  chn->group_pub_hash = grp->pub_key_hash;
   chn->peer = *peer;
   chn->direction = DIR_OUTGOING;
   chn->join_status = JOIN_WAITING;
   chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
                                               GNUNET_APPLICATION_TYPE_MULTICAST,
                                               GNUNET_CADET_OPTION_RELIABLE);
-  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_key_hash, chn,
+  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   return chn;
 }
@@ -765,13 +818,13 @@ cadet_send_join_request (struct Member *mem)
 
 static int
 cadet_send_join_decision_cb (void *cls,
-                             const struct GNUNET_HashCode *group_key_hash,
+                             const struct GNUNET_HashCode *group_pub_hash,
                              void *channel)
 {
   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
   struct Channel *chn = channel;
 
-  if (0 == memcmp (&hdcsn->member_key, &chn->member_key, sizeof (chn->member_key))
+  if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
   {
     cadet_send_channel (chn, &hdcsn->header);
@@ -906,7 +959,7 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
 
   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
-  GNUNET_CRYPTO_hash (&msg->group_key, sizeof (msg->group_key), &pub_key_hash);
+  GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
 
   struct GNUNET_CONTAINER_MultiHashMap *
     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
@@ -927,7 +980,7 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
 
     grp = &mem->grp;
     grp->is_origin = GNUNET_NO;
-    grp->pub_key = msg->group_key;
+    grp->pub_key = msg->group_pub_key;
     grp->pub_key_hash = pub_key_hash;
 
     if (NULL == grp_mem)
@@ -994,13 +1047,13 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
       req = GNUNET_malloc (sizeof (*req) + join_msg_size);
     req->header.size = htons (sizeof (*req) + join_msg_size);
     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
-    req->group_key = grp->pub_key;
+    req->group_pub_key = grp->pub_key;
     req->peer = this_peer;
-    GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_key);
+    GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
     if (0 < join_msg_size)
       memcpy (&req[1], join_msg, join_msg_size);
 
-    req->member_key = mem->pub_key;
+    req->member_pub_key = mem->pub_key;
     req->purpose.size = htonl (msg_size
                                - sizeof (req->header)
                                - sizeof (req->reserved)
@@ -1076,7 +1129,7 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
   if (NULL != grp_mem)
   {
     struct GNUNET_HashCode member_key_hash;
-    GNUNET_CRYPTO_hash (&hdcsn->member_key, sizeof (hdcsn->member_key),
+    GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
                         &member_key_hash);
     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1132,7 +1185,10 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
   }
 
   client_send_all (&grp->pub_key_hash, &out->header);
-  cadet_send_children (&grp->pub_key_hash, &out->header);
+  if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
+  {
+    client_send_ack (&grp->pub_key_hash);
+  }
   GNUNET_free (out);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1160,11 +1216,11 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
 
   /* FIXME: yucky, should use separate message structs for P2P and CS! */
   out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m);
-  out->member_key = mem->pub_key;
+  out->member_pub_key = mem->pub_key;
   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
   out->purpose.size = htonl (ntohs (out->header.size)
                              - sizeof (out->header)
-                             - sizeof (out->member_key)
+                             - sizeof (out->member_pub_key)
                              - sizeof (out->signature));
   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
 
@@ -1174,11 +1230,13 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
     GNUNET_assert (0);
   }
 
+  uint8_t send_ack = GNUNET_YES;
   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
   { /* No local origins, send to remote origin */
     if (NULL != mem->origin_channel)
     {
       cadet_send_channel (mem->origin_channel, &out->header);
+      send_ack = GNUNET_NO;
     }
     else
     {
@@ -1188,6 +1246,10 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
       return;
     }
   }
+  if (GNUNET_YES == send_ack)
+  {
+    client_send_ack (&grp->pub_key_hash);
+  }
   GNUNET_free (out);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -1490,26 +1552,26 @@ cadet_recv_join_request (void *cls,
   if (GNUNET_OK !=
       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
                                   &req->purpose, &req->signature,
-                                  &req->member_key))
+                                  &req->member_pub_key))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
 
-  struct GNUNET_HashCode group_key_hash;
-  GNUNET_CRYPTO_hash (&req->group_key, sizeof (req->group_key), &group_key_hash);
+  struct GNUNET_HashCode group_pub_hash;
+  GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
 
   struct Channel *chn = GNUNET_malloc (sizeof *chn);
   chn->channel = channel;
-  chn->group_key = req->group_key;
-  chn->group_key_hash = group_key_hash;
-  chn->member_key = req->member_key;
+  chn->group_pub_key = req->group_pub_key;
+  chn->group_pub_hash = group_pub_hash;
+  chn->member_pub_key = req->member_pub_key;
   chn->peer = req->peer;
   chn->join_status = JOIN_WAITING;
-  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_key_hash, chn,
+  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 
-  client_send_all (&group_key_hash, m);
+  client_send_all (&group_pub_hash, m);
   return GNUNET_OK;
 }
 
@@ -1609,13 +1671,13 @@ cadet_recv_message (void *cls,
   if (GNUNET_OK !=
       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
                                   &msg->purpose, &msg->signature,
-                                  &chn->group_key))
+                                  &chn->group_pub_key))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
 
-  client_send_all (&chn->group_key_hash, m);
+  client_send_all (&chn->group_pub_hash, m);
   return GNUNET_OK;
 }
 
@@ -1645,7 +1707,7 @@ cadet_recv_request (void *cls,
   }
   if (ntohl (req->purpose.size) != (size
                                     - sizeof (req->header)
-                                    - sizeof (req->member_key)
+                                    - sizeof (req->member_pub_key)
                                     - sizeof (req->signature)))
   {
     GNUNET_break_op (0);
@@ -1654,13 +1716,13 @@ cadet_recv_request (void *cls,
   if (GNUNET_OK !=
       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
                                   &req->purpose, &req->signature,
-                                  &req->member_key))
+                                  &req->member_pub_key))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
 
-  client_send_origin (&chn->group_key_hash, m);
+  client_send_origin (&chn->group_pub_hash, m);
   return GNUNET_OK;
 }
 
@@ -1684,7 +1746,7 @@ cadet_recv_replay_request (void *cls,
   struct Channel *chn = *ctx;
 
   memcpy (&rep, m, sizeof (rep));
-  memcpy (&rep.member_key, &chn->member_key, sizeof (chn->member_key));
+  memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
 
   struct GNUNET_CONTAINER_MultiHashMap *
     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
@@ -1702,7 +1764,7 @@ cadet_recv_replay_request (void *cls,
   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 
-  client_send_random (&chn->group_key_hash, &rep.header);
+  client_send_random (&chn->group_pub_hash, &rep.header);
   return GNUNET_OK;
 }
 
@@ -1777,10 +1839,10 @@ core_connected_cb  (void *cls, const struct GNUNET_PeerIdentity *my_identity)
 
   nc = GNUNET_SERVER_notification_context_create (server, 1);
   GNUNET_SERVER_add_handlers (server, server_handlers);
-  GNUNET_SERVER_disconnect_notify (server, &client_notify_disconnect, NULL);
-
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
-                                NULL);
+  GNUNET_SERVER_disconnect_notify (server,
+                                  &client_notify_disconnect, NULL);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                NULL);
 }
 
 
@@ -1792,7 +1854,8 @@ core_connected_cb  (void *cls, const struct GNUNET_PeerIdentity *my_identity)
  * @param cfg configuration to use
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *srv,
+run (void *cls,
+     struct GNUNET_SERVER_Handle *srv,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
   cfg = c;