convert fs publish to MQ
[oweals/gnunet.git] / src / multicast / multicast_api.c
index aa6dd3d9886519d7459035317b7d0c5a6db51446..ce36ef6f2b423dd67fdf66e5dacce1cac4b4a897 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2012, 2013 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -81,7 +81,6 @@ struct GNUNET_MULTICAST_Group
   struct GNUNET_MessageHeader *connect_msg;
 
   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
-  GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
   GNUNET_MULTICAST_MessageCallback message_cb;
@@ -102,6 +101,11 @@ struct GNUNET_MULTICAST_Group
    */
   uint8_t in_transmit;
 
+  /**
+   * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+   */
+  uint8_t acks_pending;
+
   /**
    * Is this the origin or a member?
    */
@@ -159,7 +163,7 @@ struct GNUNET_MULTICAST_JoinHandle
   /**
    * Public key of the member requesting join.
    */
-  struct GNUNET_CRYPTO_EcdsaPublicKey member_key;
+  struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
 
   /**
    * Peer identity of the member requesting join.
@@ -168,14 +172,6 @@ struct GNUNET_MULTICAST_JoinHandle
 };
 
 
-/**
- * Handle to pass back for the answer of a membership test.
- */
-struct GNUNET_MULTICAST_MembershipTestHandle
-{
-};
-
-
 /**
  * Opaque handle to a replay request from the multicast service.
  */
@@ -191,12 +187,16 @@ struct GNUNET_MULTICAST_ReplayHandle
  */
 struct GNUNET_MULTICAST_MemberReplayHandle
 {
-
-  GNUNET_MULTICAST_ResultCallback result_cb;
-  void *result_cls;
 };
 
 
+static void
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
+
+static void
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
+
+
 /**
  * Send first message to the service after connecting.
  */
@@ -204,9 +204,10 @@ static void
 group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
 {
   uint16_t cmsg_size = ntohs (grp->connect_msg->size);
-  struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+  struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
   memcpy (cmsg, grp->connect_msg, cmsg_size);
   GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
+  GNUNET_free (cmsg);
 }
 
 
@@ -254,9 +255,9 @@ group_recv_join_request (void *cls,
     jmsg = NULL;
   jh = GNUNET_malloc (sizeof (*jh));
   jh->group = grp;
-  jh->member_key = jreq->member_key;
+  jh->member_pub_key = jreq->member_pub_key;
   jh->peer = jreq->peer;
-  grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
+  grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
 }
 
 
@@ -285,6 +286,38 @@ group_recv_message (void *cls,
 }
 
 
+/**
+ * Receive message/request fragment acknowledgement from service.
+ */
+static void
+group_recv_fragment_ack (void *cls,
+                         struct GNUNET_CLIENT_MANAGER_Connection *client,
+                         const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *
+    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+       grp, grp->in_transmit, grp->acks_pending);
+
+  if (0 == grp->acks_pending)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p Ignoring extraneous fragment ACK.\n", grp);
+    return;
+  }
+  grp->acks_pending--;
+
+  if (GNUNET_YES != grp->in_transmit)
+    return;
+
+  if (GNUNET_YES == grp->is_origin)
+    origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+  else
+    member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+}
+
 /**
  * Origin receives uniquest request from a member.
  */
@@ -334,7 +367,7 @@ group_recv_replay_request (void *cls,
       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
       rh->grp = grp;
       rh->req = *rep;
-      grp->replay_frag_cb (grp->cb_cls, &rep->member_key,
+      grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
                            GNUNET_ntohll (rep->fragment_id),
                            GNUNET_ntohll (rep->flags), rh);
     }
@@ -346,7 +379,7 @@ group_recv_replay_request (void *cls,
       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
       rh->grp = grp;
       rh->req = *rep;
-      grp->replay_msg_cb (grp->cb_cls, &rep->member_key,
+      grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
                           GNUNET_ntohll (rep->message_id),
                           GNUNET_ntohll (rep->fragment_offset),
                           GNUNET_ntohll (rep->flags), rh);
@@ -459,6 +492,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
     sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
 
+  { group_recv_fragment_ack, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
   { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -482,6 +519,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
     sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
 
+  { group_recv_fragment_ack, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
   { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -575,7 +616,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
   hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
                               + relay_size + join_resp_size);
   hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
-  hdcsn->member_key = join->member_key;
+  hdcsn->member_pub_key = join->member_pub_key;
   hdcsn->peer = join->peer;
 
   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
@@ -589,27 +630,12 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
     memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
 
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+  GNUNET_free (hdcsn);
   GNUNET_free (join);
   return NULL;
 }
 
 
-/**
- * Call informing multicast about the decision taken for a membership test.
- *
- * @param mth
- *        Handle that was given for the query.
- * @param result
- *        #GNUNET_YES if peer was a member, #GNUNET_NO if peer was not a member,
- *        #GNUNET_SYSERR if we cannot answer the membership test.
- */
-void
-GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestHandle *mth,
-                                         int result)
-{
-}
-
-
 /**
  * Replay a message fragment for the multicast group.
  *
@@ -722,8 +748,6 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
  *        0 for a new group.
  * @param join_request_cb
  *        Function called to approve / disapprove joining of a peer.
- * @param member_test_cb
- *        Function multicast can use to test group membership.
  * @param replay_frag_cb
  *        Function that can be called to replay a message fragment.
  * @param replay_msg_cb
@@ -744,7 +768,6 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
                                uint64_t max_fragment_id,
                                GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
-                               GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
                                GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
                                GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
                                GNUNET_MULTICAST_RequestCallback request_cb,
@@ -766,7 +789,6 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
   grp->cb_cls = cls;
   grp->join_req_cb = join_request_cb;
-  grp->member_test_cb = member_test_cb;
   grp->replay_frag_cb = replay_frag_cb;
   grp->replay_msg_cb = replay_msg_cb;
   grp->message_cb = message_cb;
@@ -806,9 +828,10 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
 static void
 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
   struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
@@ -818,7 +841,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "OriginTransmitNotify() returned error or invalid message size.\n");
+         "%p OriginTransmitNotify() returned error or invalid message size.\n",
+         orig);
     /* FIXME: handle error */
     GNUNET_free (msg);
     return;
@@ -826,6 +850,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 
   if (GNUNET_NO == ret && 0 == buf_size)
   {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p OriginTransmitNotify() - transmission paused.\n", orig);
     GNUNET_free (msg);
     return; /* Transmission paused. */
   }
@@ -837,7 +863,12 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*msg) + buf_size;
 
+  grp->acks_pending++;
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
+  GNUNET_free (msg);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -866,15 +897,15 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
                                 void *notify_cls)
 {
-/* FIXME
-  if (GNUNET_YES == orig->grp.in_transmit)
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  if (GNUNET_YES == grp->in_transmit)
     return NULL;
-  orig->grp.in_transmit = GNUNET_YES;
-*/
+  grp->in_transmit = GNUNET_YES;
 
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
   tmit->origin = orig;
   tmit->message_id = message_id;
+  tmit->fragment_offset = 0;
   tmit->group_generation = group_generation;
   tmit->notify = notify;
   tmit->notify_cls = notify_cls;
@@ -893,6 +924,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
 void
 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   origin_to_all (th->origin);
 }
 
@@ -906,6 +940,7 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan
 void
 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  th->origin->grp.in_transmit = GNUNET_NO;
 }
 
 
@@ -917,9 +952,8 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
  * @a message_cb is invoked with a (failure) response and then with NULL.  If
  * the join succeeds, outstanding (state) messages and ongoing multicast
  * messages will be given to the @a message_cb until the member decides to part
- * the group.  The @a test_cb and @a replay_cb functions may be called at
- * anytime by the multicast service to support relaying messages to other
- * members of the group.
+ * the group.  The @a replay_cb function may be called at any time by the
+ * multicast service to support relaying messages to other members of the group.
  *
  * @param cfg
  *        Configuration to use.
@@ -944,8 +978,6 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
  *        Function called to approve / disapprove joining of a peer.
  * @param join_decision_cb
  *        Function called to inform about the join decision.
- * @param member_test_cb
- *        Function multicast can use to test group membership.
  * @param replay_frag_cb
  *        Function that can be called to replay message fragments
  *        this peer already knows from this group. NULL if this
@@ -965,7 +997,7 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
  */
 struct GNUNET_MULTICAST_Member *
 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
+                              const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
                               const struct GNUNET_PeerIdentity *origin,
                               uint16_t relay_count,
@@ -973,7 +1005,6 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
                               const struct GNUNET_MessageHeader *join_msg,
                               GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
                               GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
-                              GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
                               GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
                               GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
                               GNUNET_MULTICAST_MessageCallback message_cb,
@@ -988,7 +1019,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
     join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
   join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
   join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
-  join->group_key = *group_key;
+  join->group_pub_key = *group_pub_key;
   join->member_key = *member_key;
   join->origin = *origin;
   join->relay_count = ntohl (relay_count);
@@ -1003,7 +1034,6 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
   mem->join_dcsn_cb = join_decision_cb;
   grp->join_req_cb = join_request_cb;
-  grp->member_test_cb = member_test_cb;
   grp->replay_frag_cb = replay_frag_cb;
   grp->replay_msg_cb = replay_msg_cb;
   grp->message_cb = message_cb;
@@ -1085,21 +1115,16 @@ member_replay_request (struct GNUNET_MULTICAST_Member *mem,
  * @param flags
  *        Additional flags for the replay request.
  *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
- * @param result_cb
- *        Function to call when the replayed message fragment arrives.
- * @param result_cls
- *        Closure for @a result_cb.
  *
  * @return Replay request handle.
  */
 struct GNUNET_MULTICAST_MemberReplayHandle *
 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
                                          uint64_t fragment_id,
-                                         uint64_t flags,
-                                         GNUNET_MULTICAST_ResultCallback result_cb,
-                                         void *result_cls)
+                                         uint64_t flags)
 {
   member_replay_request (mem, fragment_id, 0, 0, flags);
+  // FIXME: return
 }
 
 
@@ -1118,10 +1143,6 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
  * @param flags
  *        Additional flags for the replay request.
  *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
- * @param result_cb
- *        Function to call for each replayed message fragment.
- * @param result_cls
- *        Closure for @a result_cb.
  *
  * @return Replay request handle, NULL on error.
  */
@@ -1129,23 +1150,10 @@ struct GNUNET_MULTICAST_MemberReplayHandle *
 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
                                         uint64_t message_id,
                                         uint64_t fragment_offset,
-                                        uint64_t flags,
-                                        GNUNET_MULTICAST_ResultCallback result_cb,
-                                        void *result_cls)
+                                        uint64_t flags)
 {
   member_replay_request (mem, 0, message_id, fragment_offset, flags);
-}
-
-
-/**
- * Cancel a replay request.
- *
- * @param rh
- *        Request to cancel.
- */
-void
-GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh)
-{
+  // FIXME: return
 }
 
 
@@ -1155,6 +1163,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
   struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
@@ -1185,6 +1194,10 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
   tmit->fragment_offset += sizeof (*req) + buf_size;
 
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
+  GNUNET_free (req);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -1208,15 +1221,14 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
                                    void *notify_cls)
 {
-/* FIXME
   if (GNUNET_YES == mem->grp.in_transmit)
     return NULL;
   mem->grp.in_transmit = GNUNET_YES;
-*/
 
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
   tmit->member = mem;
   tmit->request_id = request_id;
+  tmit->fragment_offset = 0;
   tmit->notify = notify;
   tmit->notify_cls = notify_cls;
 
@@ -1234,6 +1246,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
 void
 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   member_to_origin (th->member);
 }
 
@@ -1247,6 +1262,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit
 void
 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  th->member->grp.in_transmit = GNUNET_NO;
 }