multicast: switch to new cadet api
authortg(x) <*@tg-x.net>
Wed, 22 Feb 2017 20:59:56 +0000 (21:59 +0100)
committertg(x) <*@tg-x.net>
Wed, 22 Feb 2017 20:59:56 +0000 (21:59 +0100)
src/multicast/Makefile.am
src/multicast/gnunet-service-multicast.c
src/multicast/test_multicast_multipeer.c

index d5bb0d46ba09101c0bb0637fb83b0b3a2b399a2a..bac856b00a1cf946fe039af087dd28ce5d6e8003 100644 (file)
@@ -45,7 +45,7 @@ gnunet_service_multicast_SOURCES = \
  gnunet-service-multicast.c
 gnunet_service_multicast_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
-  $(top_builddir)/src/cadet/libgnunetcadet.la \
+  $(top_builddir)/src/cadet/libgnunetcadetnew.la \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
   $(GN_LIBINTL)
 
index de65e0ab743a773531cffefefca95a81ce17dfbd..554962d9946bfe167871c37b975eae32dc33673e 100644 (file)
@@ -162,6 +162,16 @@ struct Channel
    */
   struct GNUNET_PeerIdentity peer;
 
+  /**
+   * Current window size, set by cadet_notify_window_change()
+   */
+  int32_t window_size;
+
+  /**
+   * Is the connection established?
+   */
+  int8_t is_connected;
+
   /**
    * Is the remote peer admitted to the group?
    * @see enum JoinStatus
@@ -336,6 +346,17 @@ struct ReplayRequestKey
 };
 
 
+static struct Channel *
+cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer);
+
+static void
+cadet_channel_destroy (struct Channel *chn);
+
+static void
+client_send_join_decision (struct Member *mem,
+                           const struct MulticastJoinDecisionMessageHeader *hdcsn);
+
+
 /**
  * Task run during shutdown.
  *
@@ -671,36 +692,6 @@ struct CadetTransmitClosure
 };
 
 
-/**
- * CADET is ready to transmit a message.
- */
-size_t
-cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
-{
-  if (0 == buf_size)
-  {
-    /* FIXME: connection closed */
-    return 0;
-  }
-  struct CadetTransmitClosure *tcls = cls;
-  struct Channel *chn = tcls->chn;
-  uint16_t msg_size = ntohs (tcls->msg->size);
-  GNUNET_assert (msg_size <= buf_size);
-  GNUNET_memcpy (buf, tcls->msg, msg_size);
-  GNUNET_free (tcls);
-
-  if (0 == chn->msgs_pending)
-  {
-    GNUNET_break (0);
-  }
-  else if (0 == --chn->msgs_pending)
-  {
-    client_send_ack (&chn->group_pub_hash);
-  }
-  return msg_size;
-}
-
-
 /**
  * Send a message to a CADET channel.
  *
@@ -710,53 +701,19 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
 static void
 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
 {
-  uint16_t msg_size = ntohs (msg->size);
-  struct GNUNET_MessageHeader *msg_copy = GNUNET_malloc (msg_size);
-  GNUNET_memcpy (msg_copy, msg, msg_size);
-
-  struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
-  tcls->chn = chn;
-  tcls->msg = msg_copy;
-
-  chn->msgs_pending++;
-  chn->tmit_handle
-    = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                          msg_size,
-                                          &cadet_notify_transmit_ready,
-                                          tcls);
-  GNUNET_assert (NULL != chn->tmit_handle);
-}
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_copy (msg);
 
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env);
 
-/**
- * Create new outgoing CADET channel.
- *
- * @param peer
- *        Peer to connect to.
- * @param group_pub_key
- *        Public key of group the channel belongs to.
- * @param group_pub_hash
- *        Hash of @a group_pub_key.
- *
- * @return Channel.
- */
-static struct Channel *
-cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
-{
-  struct Channel *chn = GNUNET_malloc (sizeof (*chn));
-  chn->group = grp;
-  chn->group_pub_key = grp->pub_key;
-  chn->group_pub_hash = grp->pub_key_hash;
-  chn->peer = *peer;
-  chn->direction = DIR_OUTGOING;
-  chn->join_status = JOIN_WAITING;
-  chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
-                                              &grp->cadet_port_hash,
-                                              GNUNET_CADET_OPTION_RELIABLE);
-  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  return chn;
+  if (0 < chn->window_size)
+  {
+    client_send_ack (&chn->group_pub_hash);
+  }
+  else
+  {
+    chn->msgs_pending++;
+  }
 }
 
 
@@ -787,7 +744,7 @@ cadet_send_join_decision_cb (void *cls,
   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
   struct Channel *chn = channel;
 
-  const struct MulticastJoinDecisionMessage *dcsn = 
+  const struct MulticastJoinDecisionMessage *dcsn =
     (struct MulticastJoinDecisionMessage *) &hdcsn[1];
 
   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
@@ -870,61 +827,490 @@ cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
 
 
 /**
- * New incoming CADET channel.
+ * CADET channel connect handler.
+ *
+ * @see GNUNET_CADET_ConnectEventHandler()
  */
 static void *
-cadet_notify_channel_new (void *cls,
-                          struct GNUNET_CADET_Channel *channel,
-                          const struct GNUNET_PeerIdentity *initiator,
-                          const struct GNUNET_HashCode *port,
-                          enum GNUNET_CADET_ChannelOption options)
+cadet_notify_connect (void *cls,
+                      struct GNUNET_CADET_Channel *channel,
+                      const struct GNUNET_PeerIdentity *source)
 {
-  return NULL;
+  struct Channel *chn = GNUNET_malloc (sizeof *chn);
+  chn->group = cls;
+  chn->channel = channel;
+  chn->direction = DIR_INCOMING;
+  chn->join_status = JOIN_NOT_ASKED;
+
+  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  return chn;
 }
 
 
 /**
- * CADET channel is being destroyed.
+ * CADET window size change handler.
+ *
+ * @see GNUNET_CADET_WindowSizeEventHandler()
  */
 static void
-cadet_notify_channel_end (void *cls,
-                          const struct GNUNET_CADET_Channel *channel,
-                          void *ctx)
+cadet_notify_window_change (void *cls,
+                            const struct GNUNET_CADET_Channel *channel,
+                            int window_size)
 {
-  if (NULL == ctx)
+  struct Channel *chn = cls;
+  chn->is_connected = GNUNET_YES;
+  chn->window_size = (int32_t) window_size;
+
+  for (int i = 0; i < window_size; i++)
+  {
+    if (0 < chn->msgs_pending)
+    {
+      client_send_ack (&chn->group_pub_hash);
+      --chn->msgs_pending;
+    }
+    else
+    {
+      break;
+    }
+  }
+}
+
+
+/**
+ * CADET channel disconnect handler.
+ *
+ * @see GNUNET_CADET_DisconnectEventHandler()
+ */
+static void
+cadet_notify_disconnect (void *cls,
+                         const struct GNUNET_CADET_Channel *channel)
+{
+  if (NULL == cls)
     return;
 
-  struct Channel *chn = ctx;
-  if (NULL != chn->group)
-  {
-    if (GNUNET_NO == chn->group->is_origin)
-    {
-      struct Member *mem = (struct Member *) chn->group;
-      if (chn == mem->origin_channel)
-        mem->origin_channel = NULL;
-    }
-  }
+  struct Channel *chn = cls;
+  if (NULL != chn->group)
+  {
+    if (GNUNET_NO == chn->group->is_origin)
+    {
+      struct Member *mem = (struct Member *) chn->group;
+      if (chn == mem->origin_channel)
+        mem->origin_channel = NULL;
+    }
+  }
+
+  int ret;
+  do
+  {
+    ret = replay_req_remove_cadet (chn);
+  }
+  while (GNUNET_YES == ret);
+
+  GNUNET_free (chn);
+}
+
+
+static int
+check_cadet_join_request (void *cls,
+                          const struct MulticastJoinRequestMessage *req)
+{
+  struct Channel *chn = cls;
+
+  if (NULL == chn
+      || JOIN_NOT_ASKED != chn->join_status)
+  {
+    return GNUNET_SYSERR;
+  }
+
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (req->purpose.size) != (size
+                                    - sizeof (req->header)
+                                    - sizeof (req->reserved)
+                                    - sizeof (req->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
+                                  &req->purpose, &req->signature,
+                                  &req->member_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming join request message from CADET.
+ */
+static void
+handle_cadet_join_request (void *cls,
+                           const struct MulticastJoinRequestMessage *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  struct GNUNET_HashCode group_pub_hash;
+  GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
+  chn->group_pub_key = req->group_pub_key;
+  chn->group_pub_hash = group_pub_hash;
+  chn->member_pub_key = req->member_pub_key;
+  chn->peer = req->peer;
+  chn->join_status = JOIN_WAITING;
+
+  client_send_all (&group_pub_hash, &req->header);
+}
+
+
+static int
+check_cadet_join_decision (void *cls,
+                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  uint16_t size = ntohs (hdcsn->header.size);
+  if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
+             sizeof (struct MulticastJoinDecisionMessage))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  switch (chn->join_status)
+  {
+  case JOIN_REFUSED:
+    return GNUNET_SYSERR;
+
+  case JOIN_ADMITTED:
+    return GNUNET_OK;
+
+  case JOIN_NOT_ASKED:
+  case JOIN_WAITING:
+    break;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming join decision message from CADET.
+ */
+static void
+handle_cadet_join_decision (void *cls,
+                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+  const struct MulticastJoinDecisionMessage *
+    dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
+
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
+  struct Member *mem = (struct Member *) chn->group;
+  client_send_join_decision (mem, hdcsn);
+  if (GNUNET_YES == ntohl (dcsn->is_admitted))
+  {
+    chn->join_status = JOIN_ADMITTED;
+  }
+  else
+  {
+    chn->join_status = JOIN_REFUSED;
+    cadet_channel_destroy (chn);
+  }
+}
+
+
+static int
+check_cadet_message (void *cls,
+                     const struct GNUNET_MULTICAST_MessageHeader *msg)
+{
+  uint16_t size = ntohs (msg->header.size);
+  if (size < sizeof (*msg))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (msg->purpose.size) != (size
+                                    - sizeof (msg->header)
+                                    - sizeof (msg->hop_counter)
+                                    - sizeof (msg->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
+                                  &msg->purpose, &msg->signature,
+                                  &chn->group_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast message from CADET.
+ */
+static void
+handle_cadet_message (void *cls,
+                      const struct GNUNET_MULTICAST_MessageHeader *msg)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+  client_send_all (&chn->group_pub_hash, &msg->header);
+}
+
+
+static int
+check_cadet_request (void *cls,
+                     const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  if (ntohl (req->purpose.size) != (size
+                                    - sizeof (req->header)
+                                    - sizeof (req->member_pub_key)
+                                    - sizeof (req->signature)))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
+                                  &req->purpose, &req->signature,
+                                  &req->member_pub_key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast request message from CADET.
+ */
+static void
+handle_cadet_request (void *cls,
+                      const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+  client_send_origin (&chn->group_pub_hash, &req->header);
+}
+
+
+static int
+check_cadet_replay_request (void *cls,
+                            const struct MulticastReplayRequestMessage *req)
+{
+  uint16_t size = ntohs (req->header.size);
+  if (size < sizeof (*req))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay request from CADET.
+ */
+static void
+handle_cadet_replay_request (void *cls,
+                             const struct MulticastReplayRequestMessage *req)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  struct MulticastReplayRequestMessage rep = *req;
+  GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                        &chn->group->pub_key_hash);
+  if (NULL == grp_replay_req)
+  {
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+    GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
+                                       &chn->group->pub_key_hash, grp_replay_req,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
+                   rep.flags, &key_hash);
+  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+  client_send_random (&chn->group_pub_hash, &rep.header);
+}
+
+
+static int
+check_cadet_replay_response (void *cls,
+                             const struct MulticastReplayResponseMessage *res)
+{
+  struct Channel *chn = cls;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay response from CADET.
+ */
+static void
+handle_cadet_replay_response (void *cls,
+                              const struct MulticastReplayResponseMessage *res)
+{
+  struct Channel *chn = cls;
+  GNUNET_CADET_receive_done (chn->channel);
+
+  /* @todo FIXME: got replay error response, send request to other members */
+}
+
+
+static void
+group_set_cadet_port_hash (struct Group *grp)
+{
+  struct CadetPort {
+    struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+    uint32_t app_type;
+  } port = {
+    grp->pub_key,
+    GNUNET_APPLICATION_TYPE_MULTICAST,
+  };
+  GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
+}
+
+
+
+/**
+ * Create new outgoing CADET channel.
+ *
+ * @param peer
+ *        Peer to connect to.
+ * @param group_pub_key
+ *        Public key of group the channel belongs to.
+ * @param group_pub_hash
+ *        Hash of @a group_pub_key.
+ *
+ * @return Channel.
+ */
+static struct Channel *
+cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
+{
+  struct Channel *chn = GNUNET_malloc (sizeof (*chn));
+  chn->group = grp;
+  chn->group_pub_key = grp->pub_key;
+  chn->group_pub_hash = grp->pub_key_hash;
+  chn->peer = *peer;
+  chn->direction = DIR_OUTGOING;
+  chn->is_connected = GNUNET_NO;
+  chn->join_status = JOIN_WAITING;
+
+  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+    GNUNET_MQ_hd_var_size (cadet_message,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                           struct GNUNET_MULTICAST_MessageHeader,
+                           chn),
+
+    GNUNET_MQ_hd_var_size (cadet_join_decision,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+                           struct MulticastJoinDecisionMessageHeader,
+                           chn),
 
-  while (GNUNET_YES == replay_req_remove_cadet (chn));
+    GNUNET_MQ_hd_var_size (cadet_replay_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                           struct MulticastReplayRequestMessage,
+                           chn),
 
-  GNUNET_free (chn);
+    GNUNET_MQ_hd_var_size (cadet_replay_response,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                           struct MulticastReplayResponseMessage,
+                           chn),
+
+    GNUNET_MQ_handler_end ()
+  };
+
+  chn->channel = GNUNET_CADET_channel_creatE (cadet, chn, &chn->peer,
+                                              &grp->cadet_port_hash,
+                                              GNUNET_CADET_OPTION_RELIABLE,
+                                              cadet_notify_window_change,
+                                              cadet_notify_disconnect,
+                                              cadet_handlers);
+  GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  return chn;
 }
 
 
+/**
+ * Destroy outgoing CADET channel.
+ */
 static void
-group_set_cadet_port_hash (struct Group *grp)
+cadet_channel_destroy (struct Channel *chn)
 {
-  struct CadetPort {
-    struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
-    uint32_t app_type;
-  } port = {
-    grp->pub_key,
-    GNUNET_APPLICATION_TYPE_MULTICAST,
-  };
-  GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
+  GNUNET_CADET_channel_destroy (chn->channel);
+  GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash);
+  GNUNET_free (chn);
 }
 
-
 /**
  * Handle a connecting client starting an origin.
  */
@@ -961,8 +1347,44 @@ handle_client_origin_start (void *cls,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
 
     group_set_cadet_port_hash (grp);
-    orig->cadet_port = GNUNET_CADET_open_port (cadet, &grp->cadet_port_hash,
-                                               cadet_notify_channel_new, NULL);
+
+    struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+      GNUNET_MQ_hd_var_size (cadet_message,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                             struct GNUNET_MULTICAST_MessageHeader,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+                             struct GNUNET_MULTICAST_RequestHeader,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_join_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                             struct MulticastJoinRequestMessage,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_replay_request,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                             struct MulticastReplayRequestMessage,
+                             grp),
+
+      GNUNET_MQ_hd_var_size (cadet_replay_response,
+                             GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                             struct MulticastReplayResponseMessage,
+                             grp),
+
+      GNUNET_MQ_handler_end ()
+    };
+
+
+    orig->cadet_port = GNUNET_CADET_open_porT (cadet,
+                                               &grp->cadet_port_hash,
+                                               cadet_notify_connect,
+                                               NULL,
+                                               cadet_notify_window_change,
+                                               cadet_notify_disconnect,
+                                               cadet_handlers);
   }
   else
   {
@@ -1543,278 +1965,6 @@ handle_client_replay_response (void *cls,
 }
 
 
-/**
- * Incoming join request message from CADET.
- */
-int
-cadet_recv_join_request (void *cls,
-                         struct GNUNET_CADET_Channel *channel,
-                         void **ctx,
-                         const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  const struct MulticastJoinRequestMessage *
-    req = (const struct MulticastJoinRequestMessage *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*req))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (NULL != *ctx)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (req->purpose.size) != (size
-                                    - sizeof (req->header)
-                                    - sizeof (req->reserved)
-                                    - sizeof (req->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
-                                  &req->purpose, &req->signature,
-                                  &req->member_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  struct GNUNET_HashCode group_pub_hash;
-  GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
-
-  struct Channel *chn = GNUNET_malloc (sizeof *chn);
-  chn->channel = channel;
-  chn->group_pub_key = req->group_pub_key;
-  chn->group_pub_hash = group_pub_hash;
-  chn->member_pub_key = req->member_pub_key;
-  chn->peer = req->peer;
-  chn->join_status = JOIN_WAITING;
-  GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  *ctx = chn;
-
-  client_send_all (&group_pub_hash, m);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming join decision message from CADET.
- */
-int
-cadet_recv_join_decision (void *cls,
-                          struct GNUNET_CADET_Channel *channel,
-                          void **ctx,
-                          const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done (channel);
-  const struct MulticastJoinDecisionMessageHeader *
-    hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m;
-  const struct MulticastJoinDecisionMessage *
-    dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
-             sizeof (struct MulticastJoinDecisionMessage))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  switch (chn->join_status)
-  {
-  case JOIN_REFUSED:
-    return GNUNET_SYSERR;
-
-  case JOIN_ADMITTED:
-    return GNUNET_OK;
-
-  case JOIN_NOT_ASKED:
-  case JOIN_WAITING:
-    break;
-  }
-
-  // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
-  struct Member *mem = (struct Member *) chn->group;
-  client_send_join_decision (mem, hdcsn);
-  if (GNUNET_YES == ntohl (dcsn->is_admitted))
-  {
-    chn->join_status = JOIN_ADMITTED;
-    return GNUNET_OK;
-  }
-  else
-  {
-    chn->join_status = JOIN_REFUSED;
-    return GNUNET_SYSERR;
-  }
-}
-
-/**
- * Incoming multicast message from CADET.
- */
-int
-cadet_recv_message (void *cls,
-                    struct GNUNET_CADET_Channel *channel,
-                    void **ctx,
-                    const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  const struct GNUNET_MULTICAST_MessageHeader *
-    msg = (const struct GNUNET_MULTICAST_MessageHeader *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*msg))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (msg->purpose.size) != (size
-                                    - sizeof (msg->header)
-                                    - sizeof (msg->hop_counter)
-                                    - sizeof (msg->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
-                                  &msg->purpose, &msg->signature,
-                                  &chn->group_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  client_send_all (&chn->group_pub_hash, m);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming multicast request message from CADET.
- */
-int
-cadet_recv_request (void *cls,
-                    struct GNUNET_CADET_Channel *channel,
-                    void **ctx,
-                    const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  const struct GNUNET_MULTICAST_RequestHeader *
-    req = (const struct GNUNET_MULTICAST_RequestHeader *) m;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (*req))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-  if (NULL == chn)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (ntohl (req->purpose.size) != (size
-                                    - sizeof (req->header)
-                                    - sizeof (req->member_pub_key)
-                                    - sizeof (req->signature)))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  if (GNUNET_OK !=
-      GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
-                                  &req->purpose, &req->signature,
-                                  &req->member_pub_key))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  client_send_origin (&chn->group_pub_hash, m);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming multicast replay request from CADET.
- */
-int
-cadet_recv_replay_request (void *cls,
-                           struct GNUNET_CADET_Channel *channel,
-                           void **ctx,
-                           const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  struct MulticastReplayRequestMessage rep;
-  uint16_t size = ntohs (m->size);
-  if (size < sizeof (rep))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  struct Channel *chn = *ctx;
-
-  GNUNET_memcpy (&rep, m, sizeof (rep));
-  GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
-
-  struct GNUNET_CONTAINER_MultiHashMap *
-    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
-                                                        &chn->group->pub_key_hash);
-  if (NULL == grp_replay_req)
-  {
-    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
-    GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
-                                       &chn->group->pub_key_hash, grp_replay_req,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-  }
-  struct GNUNET_HashCode key_hash;
-  replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
-                   rep.flags, &key_hash);
-  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  client_send_random (&chn->group_pub_hash, &rep.header);
-  return GNUNET_OK;
-}
-
-
-/**
- * Incoming multicast replay response from CADET.
- */
-int
-cadet_recv_replay_response (void *cls,
-                            struct GNUNET_CADET_Channel *channel,
-                            void **ctx,
-                            const struct GNUNET_MessageHeader *m)
-{
-  GNUNET_CADET_receive_done(channel);
-  //struct Channel *chn = *ctx;
-
-  /* @todo FIXME: got replay error response, send request to other members */
-
-  return GNUNET_OK;
-}
-
-
 /**
  * A new client connected.
  *
@@ -1898,32 +2048,6 @@ client_notify_disconnect (void *cls,
 }
 
 
-/**
- * Message handlers for CADET.
- */
-static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
-  { cadet_recv_join_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
-
-  { cadet_recv_join_decision,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
-
-  { cadet_recv_message,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
-
-  { cadet_recv_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
-
-  { cadet_recv_replay_request,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
-
-  { cadet_recv_replay_response,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
-
-  { NULL, 0, 0 }
-};
-
-
 /**
  * Service started.
  *
@@ -1949,9 +2073,8 @@ run (void *cls,
   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
 
-  cadet = GNUNET_CADET_connect (cfg, NULL,
-                                cadet_notify_channel_end,
-                                cadet_handlers);
+  cadet = GNUNET_CADET_connecT (cfg);
+
   GNUNET_assert (NULL != cadet);
 
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
index bb3ae447c042212d6463844584b7067b266d8d90..1b76737f4f90312b0bf1b94ad6bfc1f3cee72af7 100644 (file)
@@ -35,7 +35,7 @@
 
 #define NUM_PEERS 2
 
-static struct GNUNET_TESTBED_Operation *op0; 
+static struct GNUNET_TESTBED_Operation *op0;
 static struct GNUNET_TESTBED_Operation *op1;
 static struct GNUNET_TESTBED_Operation *pi_op0;
 static struct GNUNET_TESTBED_Operation *pi_op1;
@@ -115,19 +115,19 @@ timeout_task (void *cls)
 }
 
 
-static void 
+static void
 member_join_request (void *cls,
                      const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key,
                      const struct GNUNET_MessageHeader *join_msg,
                      struct GNUNET_MULTICAST_JoinHandle *jh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Member sent a join request.\n");
-  
+
 }
 
 
-static void 
+static void
 member_join_decision (void *cls,
                       int is_admitted,
                       const struct GNUNET_PeerIdentity *peer,
@@ -135,49 +135,49 @@ member_join_decision (void *cls,
                       const struct GNUNET_PeerIdentity *relays,
                       const struct GNUNET_MessageHeader *join_msg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Member received a decision from origin: %s\n", (GNUNET_YES == is_admitted)?"accepted":"rejected");
-  
+
   result = GNUNET_OK;
   GNUNET_SCHEDULER_shutdown ();
 }
 
-static void 
+static void
 member_replay_frag ()
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "member replay frag...\n");
 }
 
-static void 
-member_replay_msg () 
+static void
+member_replay_msg ()
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "member replay msg...\n");
 }
 
-static void 
-member_message () 
+static void
+member_message ()
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "member message...\n");
 }
 
-static void 
+static void
 origin_join_request (void *cls,
                  const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key,
                  const struct GNUNET_MessageHeader *join_msg,
-                 struct GNUNET_MULTICAST_JoinHandle *jh) 
+                 struct GNUNET_MULTICAST_JoinHandle *jh)
 {
   struct GNUNET_MessageHeader *join_resp;
 
   uint8_t data_size = ntohs (join_msg->size);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Dizzy: Mh, got a join request...\n");
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "'%s'\n", (char *)&join_msg[1]);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Dizzy: Oh, it's Bird! Let's get him in.\n");
 
   char data[] = "Hi, Bird. Come in!";
@@ -196,7 +196,7 @@ origin_join_request (void *cls,
   result = GNUNET_OK;
 }
 
-static void 
+static void
 origin_replay_frag (void *cls,
                 const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key,
                 uint64_t fragment_id,
@@ -206,15 +206,15 @@ origin_replay_frag (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin replay fraq msg\n");
 }
 
-static void 
+static void
 origin_replay_msg (void *cls,
                const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key,
                uint64_t message_id,
                uint64_t fragment_offset,
                uint64_t flags,
-               struct GNUNET_MULTICAST_ReplayHandle *rh) 
+               struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-  
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin replay msg\n");
 }
 
@@ -223,12 +223,12 @@ origin_request (void *cls,
             const struct GNUNET_MULTICAST_RequestHeader *req)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin request msg\n");
-  
+
 }
 
 static void
 origin_message (void *cls,
-            const struct GNUNET_MULTICAST_MessageHeader *msg) 
+            const struct GNUNET_MULTICAST_MessageHeader *msg)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin message msg\n");
 }
@@ -240,11 +240,11 @@ service_connect1 (void *cls,
                   void *ca_result,
                   const char *emsg)
 {
-  member = ca_result; 
+  member = ca_result;
 
   if (NULL != member)
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connected to multicast service of member\n");
-  else 
+  else
     result = GNUNET_SYSERR;
 }
 
@@ -264,21 +264,21 @@ multicast_ca1 (void *cls,
                const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct GNUNET_MessageHeader *join_msg;
-  
+
   // Get members keys
   member_key = GNUNET_CRYPTO_ecdsa_key_create ();
   GNUNET_CRYPTO_ecdsa_key_get_public (member_key, &member_pub_key);
-  
+
   char data[] = "Whut's up, Dizzy!";
   uint8_t data_size = strlen (data) + 1;
   join_msg = GNUNET_malloc (sizeof (join_msg) + data_size);
   join_msg->size = htons (sizeof (join_msg) + data_size);
   join_msg->type = htons (123);
   GNUNET_memcpy (&join_msg[1], data, data_size);
-  
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Members tries to join multicast group\n");
-  
+
   return GNUNET_MULTICAST_member_join (cfg,
                                        &group_pub_key,
                                        member_key,
@@ -301,13 +301,13 @@ peer_information_cb (void *cls,
                      const struct GNUNET_TESTBED_PeerInformation *pinfo,
                      const char *emsg)
 {
-  int i = (int) cls;
+  int i = (int) (long) cls;
 
   if (NULL == pinfo) {
     result = GNUNET_SYSERR;
     GNUNET_SCHEDULER_shutdown ();
   }
-  
+
   peer_id[i] = pinfo->result.id;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -376,13 +376,13 @@ service_connect0 (void *cls,
  * Function run when service multicast has started and is providing us
  * with a configuration file.
  */
-static void * 
+static void *
 multicast_ca0 (void *cls,
                const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   group_key = GNUNET_CRYPTO_eddsa_key_create ();
   GNUNET_CRYPTO_eddsa_key_get_public (group_key, &group_pub_key);
-              
+
   return GNUNET_MULTICAST_origin_start (cfg,
                                         group_key,
                                         0,
@@ -433,7 +433,7 @@ testbed_master (void *cls,
      topology (FIXME)  */
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Connected to testbed_master()\n");
-              
+
   peers = p;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -451,7 +451,7 @@ testbed_master (void *cls,
                                         NULL);                   /* closure for the above two callbacks */
 
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); /* Schedule a new task on shutdown */
-  
+
   /* Schedule the shutdown task with a delay of a few Seconds */
   timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 40),
                                              &timeout_task, NULL);