Add possibility to send multiple peers to client
authorJulius Bünger <buenger@mytum.de>
Tue, 18 Sep 2018 11:59:37 +0000 (13:59 +0200)
committerJulius Bünger <buenger@mytum.de>
Tue, 18 Sep 2018 15:20:26 +0000 (17:20 +0200)
src/include/gnunet_rps_service.h
src/rps/gnunet-rps.c
src/rps/gnunet-service-rps.c
src/rps/rps.h
src/rps/rps_api.c

index 252188c627a5cb57ae5d2fef710c2bfdfcebc717..eda012076d190d45a99209da864c31b27309119c 100644 (file)
@@ -79,6 +79,7 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls,
  * @param peer The received peer
  */
 typedef void (* GNUNET_RPS_StreamInputCB) (void *cls,
+    uint64_t num_peers,
     const struct GNUNET_PeerIdentity *peer);
 
 /**
index d2c497fd4b21c5f873e343cabecf441279cf44fe..d0f905f511d4e561fe0e0fd0323dcedceeb1926c 100644 (file)
@@ -59,7 +59,7 @@ static int stream_input;
 static uint64_t num_view_updates;
 
 /**
- * @brief Number of updates we want to receive
+ * @brief Number of peers we want to receive from stream
  */
 static uint64_t num_stream_peers;
 
@@ -154,11 +154,33 @@ view_update_handle (void *cls,
  */
 static void
 stream_input_handle (void *cls,
-                     const struct GNUNET_PeerIdentity *recv_peer)
+                     uint64_t num_peers,
+                     const struct GNUNET_PeerIdentity *recv_peers)
 {
-  // TODO when source of peer is sent, also print source
-  FPRINTF (stdout, "%s\n",
-           GNUNET_i2s_full (recv_peer));
+  uint64_t i;
+  (void) cls;
+
+  if (0 == num_peers)
+  {
+    FPRINTF (stdout, "Empty view\n");
+  }
+  req_handle = NULL;
+  for (i = 0; i < num_peers; i++)
+  {
+    FPRINTF (stdout, "%s\n",
+             GNUNET_i2s_full (&recv_peers[i]));
+
+    if (1 == num_stream_peers)
+    {
+      ret = 0;
+      GNUNET_SCHEDULER_shutdown ();
+      break;
+    }
+    else if (1 < num_stream_peers)
+    {
+      num_stream_peers--;
+    }
+  }
 }
 
 
index 5b78bb4a812961d6dceb391767d6addd1e430928..4da73b09c937388a18d49c4268799f1cea6e6954 100644 (file)
@@ -2208,8 +2208,8 @@ send_view (const struct ClientContext *cli_ctx,
   out_msg->num_peers = htonl (view_size);
 
   GNUNET_memcpy (&out_msg[1],
-          view_array,
-          view_size * sizeof (struct GNUNET_PeerIdentity));
+                 view_array,
+                 view_size * sizeof (struct GNUNET_PeerIdentity));
   GNUNET_MQ_send (cli_ctx->mq, ev);
 }
 
@@ -2217,25 +2217,30 @@ send_view (const struct ClientContext *cli_ctx,
 /**
  * @brief Send peer from biased stream to client.
  *
+ * TODO merge with send_view, parameterise
+ *
  * @param cli_ctx the context of the client
  * @param view_array the peerids of the view as array (can be empty)
  * @param view_size the size of the view array (can be 0)
  */
 void
 send_stream_peer (const struct ClientContext *cli_ctx,
-                  const struct GNUNET_PeerIdentity *peer)
+                  uint64_t num_peers,
+                  const struct GNUNET_PeerIdentity *peers)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
 
-  GNUNET_assert (NULL != peer);
+  GNUNET_assert (NULL != peers);
 
-  ev = GNUNET_MQ_msg (out_msg,
-                      GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+  ev = GNUNET_MQ_msg_extra (out_msg,
+                            num_peers * sizeof (struct GNUNET_PeerIdentity),
+                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+  out_msg->num_peers = htonl (num_peers);
 
-  GNUNET_memcpy (&out_msg->peer,
-          peer,
-          sizeof (struct GNUNET_PeerIdentity));
+  GNUNET_memcpy (&out_msg[1],
+                 peers,
+                 num_peers * sizeof (struct GNUNET_PeerIdentity));
   GNUNET_MQ_send (cli_ctx->mq, ev);
 }
 
@@ -2288,36 +2293,45 @@ clients_notify_view_update (void)
  * @brief sends updates to clients that are interested
  */
 static void
-clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
-                            //enum StreamPeerSource)
+clients_notify_stream_peer (uint64_t num_peers,
+                            const struct GNUNET_PeerIdentity *peers)
+                            // TODO enum StreamPeerSource)
 {
   struct ClientContext *cli_ctx_iter;
+  uint64_t num_peers_send;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Got peer (%s) from biased stream - update all clients\n",
-      GNUNET_i2s (peer));
+      GNUNET_i2s (peers));
 
-  /* check size of view is small enough */
   for (cli_ctx_iter = cli_ctx_head;
        NULL != cli_ctx_iter;
        cli_ctx_iter = cli_ctx_head->next)
   {
-    if (1 < cli_ctx_iter->stream_peers_left)
+    if (0 < cli_ctx_iter->stream_peers_left)
     {
       /* Client wants to receive limited amount of updates */
-      cli_ctx_iter->stream_peers_left -= 1;
-    } else if (1 == cli_ctx_iter->stream_peers_left)
+      if (num_peers > cli_ctx_iter->stream_peers_left)
+      {
+        num_peers_send = num_peers - cli_ctx_iter->stream_peers_left;
+        cli_ctx_iter->stream_peers_left = 0;
+      }
+      else
+      {
+        num_peers_send = cli_ctx_iter->stream_peers_left - num_peers;
+        cli_ctx_iter->stream_peers_left -= num_peers_send;
+      }
+    } else if (0 > cli_ctx_iter->stream_peers_left)
     {
-      /* Last update of view for client */
-      cli_ctx_iter->stream_peers_left = -1;
-    } else if (0 > cli_ctx_iter->stream_peers_left) {
       /* Client is not interested in updates */
       continue;
+    } else /* _updates_left == 0 - infinite amount of updates */
+    {
+      num_peers_send = num_peers;
     }
-    /* else _updates_left == 0 - infinite amount of updates */
 
     /* send view */
-    send_stream_peer (cli_ctx_iter, peer);
+    send_stream_peer (cli_ctx_iter, num_peers_send, peers);
   }
 }
 
@@ -2338,7 +2352,7 @@ hist_update (void *cls,
     inserted = insert_in_view (&ids[i]);
     if (GNUNET_OK == inserted)
     {
-      clients_notify_stream_peer (&ids[i]);
+      clients_notify_stream_peer (1, &ids[i]);
     }
     to_file (file_name_view_log,
              "+%s\t(hist)",
@@ -2549,7 +2563,7 @@ insert_in_view_op (void *cls,
   inserted = insert_in_view (peer);
   if (GNUNET_OK == inserted)
   {
-    clients_notify_stream_peer (peer);
+    clients_notify_stream_peer (1, peer);
   }
 }
 
@@ -3834,7 +3848,7 @@ do_round (void *cls)
                                                                   permut[i]));
       if (GNUNET_OK == inserted)
       {
-        clients_notify_stream_peer (
+        clients_notify_stream_peer (1,
             CustomPeerMap_get_peer_by_index (push_map, permut[i]));
       }
       to_file (file_name_view_log,
@@ -3855,7 +3869,7 @@ do_round (void *cls)
             permut[i - first_border]));
       if (GNUNET_OK == inserted)
       {
-        clients_notify_stream_peer (
+        clients_notify_stream_peer (1,
             CustomPeerMap_get_peer_by_index (push_map, permut[i]));
       }
       to_file (file_name_view_log,
index 66b2dd962699535d6aa76b3722e338703e640a80..26615bfc5d745646b0b6227690647221f1a8ffaa 100644 (file)
@@ -250,11 +250,13 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply
   uint32_t id GNUNET_PACKED;
 
   /**
-   * @brief The peer of the biased stream
+   * Number of peers
    */
-  struct GNUNET_PeerIdentity peer;
+  uint64_t num_peers GNUNET_PACKED;
 
   // TODO maybe source of peer (pull/push list, peerinfo, ...)
+
+  /* Followed by num_peers * GNUNET_PeerIdentity */
 };
 
 GNUNET_NETWORK_STRUCT_END
index b7644540de8078d9cd0b3e9f5c7947a0bcaf259a..96660ded6c2f097f8911eaf96066d82ea203099a 100644 (file)
@@ -371,6 +371,34 @@ handle_view_update (void *cls,
 }
 
 
+/**
+ * This function is called, when the service sends another peer from the biased
+ * stream.
+ * It calls the callback the caller provided
+ * and disconnects afterwards.
+ *
+ * TODO merge with check_view_update
+ *
+ * @param msg the message
+ */
+static int
+check_stream_input (void *cls,
+                    const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
+{
+  uint16_t msize = ntohs (msg->header.size);
+  uint32_t num_peers = ntohl (msg->num_peers);
+  (void) cls;
+
+  msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
+  if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
+       (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
 /**
  * This function is called, when the service sends another peer from the biased
  * stream.
@@ -384,14 +412,17 @@ handle_stream_input (void *cls,
                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
 {
   struct GNUNET_RPS_Handle *h = cls;
+  const struct GNUNET_PeerIdentity *peers;
 
   /* Give the peers back */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "New peer of biased input stream\n");
+       "New peer of %" PRIu64 " biased input stream\n",
+       ntohl (msg->num_peers));
 
+  peers = (struct GNUNET_PeerIdentity *) &msg[1];
   GNUNET_assert (NULL != h);
   GNUNET_assert (NULL != h->stream_input_cb);
-  h->stream_input_cb (h->stream_input_cb, &msg->peer);
+  h->stream_input_cb (h->stream_input_cb, ntohl (msg->num_peers), peers);
 }
 
 
@@ -444,10 +475,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
                            h),
-    GNUNET_MQ_hd_fixed_size (stream_input,
-                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
-                             struct GNUNET_RPS_CS_DEBUG_StreamReply,
-                             h),
+    GNUNET_MQ_hd_var_size (stream_input,
+                           GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
+                           struct GNUNET_RPS_CS_DEBUG_StreamReply,
+                           h),
     GNUNET_MQ_handler_end ()
   };