-ensure stats queues do not grow too big
[oweals/gnunet.git] / src / multicast / multicast_api.c
index 117a0efe25d7955ff891fc78f8e396a6fe072297..ce36ef6f2b423dd67fdf66e5dacce1cac4b4a897 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2012, 2013 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -101,6 +101,11 @@ struct GNUNET_MULTICAST_Group
    */
   uint8_t in_transmit;
 
+  /**
+   * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+   */
+  uint8_t acks_pending;
+
   /**
    * Is this the origin or a member?
    */
@@ -158,7 +163,7 @@ struct GNUNET_MULTICAST_JoinHandle
   /**
    * Public key of the member requesting join.
    */
-  struct GNUNET_CRYPTO_EcdsaPublicKey member_key;
+  struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
 
   /**
    * Peer identity of the member requesting join.
@@ -185,6 +190,13 @@ struct GNUNET_MULTICAST_MemberReplayHandle
 };
 
 
+static void
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
+
+static void
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
+
+
 /**
  * Send first message to the service after connecting.
  */
@@ -192,9 +204,10 @@ static void
 group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
 {
   uint16_t cmsg_size = ntohs (grp->connect_msg->size);
-  struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+  struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
   memcpy (cmsg, grp->connect_msg, cmsg_size);
   GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
+  GNUNET_free (cmsg);
 }
 
 
@@ -242,9 +255,9 @@ group_recv_join_request (void *cls,
     jmsg = NULL;
   jh = GNUNET_malloc (sizeof (*jh));
   jh->group = grp;
-  jh->member_key = jreq->member_key;
+  jh->member_pub_key = jreq->member_pub_key;
   jh->peer = jreq->peer;
-  grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
+  grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
 }
 
 
@@ -273,6 +286,38 @@ group_recv_message (void *cls,
 }
 
 
+/**
+ * Receive message/request fragment acknowledgement from service.
+ */
+static void
+group_recv_fragment_ack (void *cls,
+                         struct GNUNET_CLIENT_MANAGER_Connection *client,
+                         const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *
+    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+       grp, grp->in_transmit, grp->acks_pending);
+
+  if (0 == grp->acks_pending)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p Ignoring extraneous fragment ACK.\n", grp);
+    return;
+  }
+  grp->acks_pending--;
+
+  if (GNUNET_YES != grp->in_transmit)
+    return;
+
+  if (GNUNET_YES == grp->is_origin)
+    origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+  else
+    member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+}
+
 /**
  * Origin receives uniquest request from a member.
  */
@@ -322,7 +367,7 @@ group_recv_replay_request (void *cls,
       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
       rh->grp = grp;
       rh->req = *rep;
-      grp->replay_frag_cb (grp->cb_cls, &rep->member_key,
+      grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
                            GNUNET_ntohll (rep->fragment_id),
                            GNUNET_ntohll (rep->flags), rh);
     }
@@ -334,7 +379,7 @@ group_recv_replay_request (void *cls,
       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
       rh->grp = grp;
       rh->req = *rep;
-      grp->replay_msg_cb (grp->cb_cls, &rep->member_key,
+      grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
                           GNUNET_ntohll (rep->message_id),
                           GNUNET_ntohll (rep->fragment_offset),
                           GNUNET_ntohll (rep->flags), rh);
@@ -447,6 +492,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
     sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
 
+  { group_recv_fragment_ack, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
   { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -470,6 +519,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
     sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
 
+  { group_recv_fragment_ack, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
   { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -563,7 +616,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
   hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
                               + relay_size + join_resp_size);
   hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
-  hdcsn->member_key = join->member_key;
+  hdcsn->member_pub_key = join->member_pub_key;
   hdcsn->peer = join->peer;
 
   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
@@ -577,6 +630,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
     memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
 
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+  GNUNET_free (hdcsn);
   GNUNET_free (join);
   return NULL;
 }
@@ -774,9 +828,10 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
 static void
 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
   struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
@@ -786,7 +841,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "OriginTransmitNotify() returned error or invalid message size.\n");
+         "%p OriginTransmitNotify() returned error or invalid message size.\n",
+         orig);
     /* FIXME: handle error */
     GNUNET_free (msg);
     return;
@@ -794,6 +850,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 
   if (GNUNET_NO == ret && 0 == buf_size)
   {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p OriginTransmitNotify() - transmission paused.\n", orig);
     GNUNET_free (msg);
     return; /* Transmission paused. */
   }
@@ -805,7 +863,12 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*msg) + buf_size;
 
+  grp->acks_pending++;
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
+  GNUNET_free (msg);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -834,15 +897,15 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
                                 void *notify_cls)
 {
-/* FIXME
-  if (GNUNET_YES == orig->grp.in_transmit)
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  if (GNUNET_YES == grp->in_transmit)
     return NULL;
-  orig->grp.in_transmit = GNUNET_YES;
-*/
+  grp->in_transmit = GNUNET_YES;
 
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
   tmit->origin = orig;
   tmit->message_id = message_id;
+  tmit->fragment_offset = 0;
   tmit->group_generation = group_generation;
   tmit->notify = notify;
   tmit->notify_cls = notify_cls;
@@ -861,6 +924,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
 void
 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   origin_to_all (th->origin);
 }
 
@@ -874,6 +940,7 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan
 void
 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  th->origin->grp.in_transmit = GNUNET_NO;
 }
 
 
@@ -930,7 +997,7 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
  */
 struct GNUNET_MULTICAST_Member *
 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
+                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
                               const struct GNUNET_PeerIdentity *origin,
                               uint16_t relay_count,
@@ -952,7 +1019,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
     join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
   join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
   join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
-  join->group_key = *group_key;
+  join->group_pub_key = *group_pub_key;
   join->member_key = *member_key;
   join->origin = *origin;
   join->relay_count = ntohl (relay_count);
@@ -1057,6 +1124,7 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
                                          uint64_t flags)
 {
   member_replay_request (mem, fragment_id, 0, 0, flags);
+  // FIXME: return
 }
 
 
@@ -1085,6 +1153,7 @@ GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
                                         uint64_t flags)
 {
   member_replay_request (mem, 0, message_id, fragment_offset, flags);
+  // FIXME: return
 }
 
 
@@ -1094,6 +1163,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
   struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
@@ -1124,6 +1194,10 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
   tmit->fragment_offset += sizeof (*req) + buf_size;
 
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
+  GNUNET_free (req);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -1147,15 +1221,14 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
                                    void *notify_cls)
 {
-/* FIXME
   if (GNUNET_YES == mem->grp.in_transmit)
     return NULL;
   mem->grp.in_transmit = GNUNET_YES;
-*/
 
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
   tmit->member = mem;
   tmit->request_id = request_id;
+  tmit->fragment_offset = 0;
   tmit->notify = notify;
   tmit->notify_cls = notify_cls;
 
@@ -1173,6 +1246,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
 void
 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   member_to_origin (th->member);
 }
 
@@ -1186,6 +1262,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit
 void
 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  th->member->grp.in_transmit = GNUNET_NO;
 }