Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / set / gnunet-service-set.c
index 3f1086891f45f0411fd36a46b78b27a7cb7e5e0d..b0f8b209186c1ee4a89036ddb2f04b06df2638bd 100644 (file)
@@ -223,17 +223,15 @@ listener_destroy (struct Listener *listener)
   {
     struct GNUNET_SERVICE_Client *client = listener->client;
 
+    GNUNET_MQ_destroy (listener->client_mq);
+    listener->client_mq = NULL;
+
     listener->client = NULL;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Disconnecting listener client\n");
     GNUNET_SERVICE_client_drop (client);
     return;
   }
-  if (NULL != listener->client_mq)
-  {
-    GNUNET_MQ_destroy (listener->client_mq);
-    listener->client_mq = NULL;
-  }
   GNUNET_CADET_close_port (listener->open_port);
   GNUNET_CONTAINER_DLL_remove (listeners_head,
                                listeners_tail,
@@ -473,11 +471,6 @@ _GSS_operation_destroy (struct Operation *op,
     GNUNET_free (op->spec);
     op->spec = NULL;
   }
-  if (NULL != op->mq)
-  {
-    GNUNET_MQ_destroy (op->mq);
-    op->mq = NULL;
-  }
   if (NULL != (channel = op->channel))
   {
     op->channel = NULL;
@@ -537,11 +530,6 @@ set_destroy (struct Set *set)
     _GSS_operation_destroy (set->ops_head, GNUNET_NO);
   set->vt->destroy_set (set->state);
   set->state = NULL;
-  if (NULL != set->client_mq)
-  {
-    GNUNET_MQ_destroy (set->client_mq);
-    set->client_mq = NULL;
-  }
   if (NULL != set->iter)
   {
     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
@@ -625,42 +613,6 @@ client_connect_cb (void *cls,
 }
 
 
-/**
- * Clean up after a client has disconnected
- *
- * @param cls closure, unused
- * @param client the client to clean up after
- * @param internal_cls our client-specific internal data structure
- */
-static void
-client_disconnect_cb (void *cls,
-                      struct GNUNET_SERVICE_Client *client,
-                      void *internal_cls)
-{
-  struct Listener *listener;
-  struct Set *set;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "client disconnected, cleaning up\n");
-  set = set_get (client);
-  if (NULL != set)
-  {
-    set->client = NULL;
-    set_destroy (set);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Client's set destroyed\n");
-  }
-  listener = listener_get (client);
-  if (NULL != listener)
-  {
-    listener->client = NULL;
-    listener_destroy (listener);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Client's listener destroyed\n");
-  }
-}
-
-
 /**
  * Destroy an incoming request from a remote peer
  *
@@ -687,11 +639,6 @@ incoming_destroy (struct Operation *incoming)
     GNUNET_free (incoming->spec);
     incoming->spec = NULL;
   }
-  if (NULL != incoming->mq)
-  {
-    GNUNET_MQ_destroy (incoming->mq);
-    incoming->mq = NULL;
-  }
   if (NULL != (channel = incoming->channel))
   {
     incoming->channel = NULL;
@@ -700,6 +647,52 @@ incoming_destroy (struct Operation *incoming)
 }
 
 
+/**
+ * Clean up after a client has disconnected
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ * @param internal_cls our client-specific internal data structure
+ */
+static void
+client_disconnect_cb (void *cls,
+                      struct GNUNET_SERVICE_Client *client,
+                      void *internal_cls)
+{
+  struct Set *set;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "client disconnected, cleaning up\n");
+  set = set_get (client);
+  if (NULL != set)
+  {
+    set->client = NULL;
+    set_destroy (set);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client's set destroyed\n");
+  }
+  struct Listener *listener = listener_get (client);
+  struct Operation *op = incoming_head;
+  if (NULL != listener)
+  {
+    /* destroy all incoming operations whose client just
+     * got destroyed */
+    while (NULL != op)
+    {
+      struct Operation *curr = op;
+      op = op->next;
+      if ( (GNUNET_YES == curr->is_incoming) && 
+           (curr->listener == listener) )
+        incoming_destroy (curr);
+    }
+    listener->client = NULL;
+    listener_destroy (listener);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client's listener destroyed\n");
+  }
+}
+
+
 /**
  * Suggest the given request to the listener. The listening client can
  * then accept or reject the remote request.
@@ -801,7 +794,7 @@ handle_incoming_msg (struct Operation *op,
   listener = op->listener;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received P2P operation request (op %u, port %s) for active listener\n",
-              ntohl (msg->operation),
+              (uint32_t) ntohl (msg->operation),
               GNUNET_h2s (&listener->app_id));
   incoming_suggest (op,
                     listener);
@@ -825,16 +818,16 @@ execute_add (struct Set *set,
   el.data = &msg[1];
   el.element_type = ntohs (msg->element_type);
   GNUNET_SET_element_hash (&el, &hash);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Client inserts element %s of size %u\n",
-              GNUNET_h2s (&hash),
-              el.size);
 
   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
                                           &hash);
 
   if (NULL == ee)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client inserts element %s of size %u\n",
+                GNUNET_h2s (&hash),
+                el.size);
     ee = GNUNET_malloc (el.size + sizeof *ee);
     ee->element.size = el.size;
     GNUNET_memcpy (&ee[1],
@@ -854,6 +847,11 @@ execute_add (struct Set *set,
   }
   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client inserted element %s of size %u twice (ignored)\n",
+                GNUNET_h2s (&hash),
+                el.size);
+
     /* same element inserted twice */
     return;
   }
@@ -863,7 +861,9 @@ execute_add (struct Set *set,
       .generation = set->current_generation,
       .added = GNUNET_YES
     };
-    GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+    GNUNET_array_append (ee->mutations,
+                         ee->mutations_size,
+                         mut);
   }
 
   set->vt->add (set->state, ee);
@@ -883,9 +883,6 @@ execute_remove (struct Set *set,
 
   msg = (const struct GNUNET_SET_ElementMessage *) m;
   el.size = ntohs (m->size) - sizeof *msg;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Client removes element of size %u\n",
-              el.size);
   el.data = &msg[1];
   el.element_type = ntohs (msg->element_type);
   GNUNET_SET_element_hash (&el, &hash);
@@ -894,11 +891,17 @@ execute_remove (struct Set *set,
   if (NULL == ee)
   {
     /* Client tried to remove non-existing element. */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client removes non-existing element of size %u\n",
+                el.size);
     return;
   }
   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
   {
     /* Client tried to remove element twice */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client removed element of size %u twice (ignored)\n",
+                el.size);
     return;
   }
   else
@@ -907,7 +910,14 @@ execute_remove (struct Set *set,
       .generation = set->current_generation,
       .added = GNUNET_NO
     };
-    GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client removes element of size %u\n",
+                el.size);
+
+    GNUNET_array_append (ee->mutations,
+                         ee->mutations_size,
+                         mut);
   }
   set->vt->remove (set->state, ee);
 }
@@ -1078,7 +1088,7 @@ handle_client_create_set (void *cls,
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client created new set (operation %u)\n",
-              ntohl (msg->operation));
+              (uint32_t) ntohl (msg->operation));
   if (NULL != set_get (client))
   {
     /* There can only be one set per client */
@@ -1197,7 +1207,7 @@ channel_new_cb (void *cls,
   incoming->is_incoming = GNUNET_YES;
   incoming->peer = *source;
   incoming->channel = channel;
-  incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
+  incoming->mq = GNUNET_CADET_get_mq (incoming->channel);
   incoming->vt = &incoming_vt;
   incoming->timeout_task
     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
@@ -1373,6 +1383,14 @@ handle_client_listen (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
                            struct GNUNET_MessageHeader,
                            NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                           struct GNUNET_MessageHeader,
+                           NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
                            struct GNUNET_MessageHeader,
@@ -1381,6 +1399,10 @@ handle_client_listen (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
                            struct GNUNET_MessageHeader,
                            NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
@@ -1396,7 +1418,6 @@ handle_client_listen (void *cls,
     GNUNET_MQ_handler_end ()
   };
   struct Listener *listener;
-  struct Operation *op;
 
   if (NULL != listener_get (client))
   {
@@ -1425,7 +1446,7 @@ handle_client_listen (void *cls,
                                                 &channel_end_cb,
                                                 cadet_handlers);
   /* check for existing incoming requests the listener might be interested in */
-  for (op = incoming_head; NULL != op; op = op->next)
+  for (struct Operation *op = incoming_head; NULL != op; op = op->next)
   {
     if (NULL == op->spec)
       continue; /* no details available yet */
@@ -1525,9 +1546,9 @@ handle_client_mutation (void *cls,
     pm = GNUNET_new (struct PendingMutation);
     pm->mutation_message = GNUNET_copy_message (m);
     pm->set = set;
-    GNUNET_CONTAINER_DLL_insert (set->content->pending_mutations_head,
-                                 set->content->pending_mutations_tail,
-                                 pm);
+    GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
+                                      set->content->pending_mutations_tail,
+                                      pm);
     return;
   }
   execute_mutation (set, m);
@@ -1636,6 +1657,18 @@ handle_client_evaluate (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
                            struct GNUNET_MessageHeader,
                            op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           op),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
@@ -1671,6 +1704,10 @@ handle_client_evaluate (void *cls,
   spec->set = set;
   spec->result_mode = ntohl (msg->result_mode);
   spec->client_request_id = ntohl (msg->request_id);
+  spec->byzantine = msg->byzantine;
+  spec->byzantine_lower_bound = msg->byzantine_lower_bound;
+  spec->force_full = msg->force_full;
+  spec->force_delta = msg->force_delta;
   context = GNUNET_MQ_extract_nested_mh (msg);
   op->spec = spec;
 
@@ -1694,7 +1731,7 @@ handle_client_evaluate (void *cls,
                                              &channel_window_cb,
                                              &channel_end_cb,
                                              cadet_handlers);
-  op->mq = GNUNET_CADET_mq_create (op->channel);
+  op->mq = GNUNET_CADET_get_mq (op->channel);
   set->vt->evaluate (op,
                      context);
   GNUNET_SERVICE_client_continue (client);
@@ -1921,7 +1958,7 @@ handle_client_cancel (void *cls,
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client requested cancel for op %u\n",
-              ntohl (msg->request_id));
+              (uint32_t) ntohl (msg->request_id));
   found = GNUNET_NO;
   for (op = set->ops_head; NULL != op; op = op->next)
   {
@@ -1995,7 +2032,7 @@ handle_client_accept (void *cls,
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client accepting request %u\n",
-              ntohl (msg->accept_reject_id));
+              (uint32_t) ntohl (msg->accept_reject_id));
   GNUNET_assert (GNUNET_YES == op->is_incoming);
   op->is_incoming = GNUNET_NO;
   GNUNET_CONTAINER_DLL_remove (incoming_head,
@@ -2007,6 +2044,10 @@ handle_client_accept (void *cls,
                                op);
   op->spec->client_request_id = ntohl (msg->request_id);
   op->spec->result_mode = ntohl (msg->result_mode);
+  op->spec->byzantine = msg->byzantine;
+  op->spec->byzantine_lower_bound = msg->byzantine_lower_bound;
+  op->spec->force_full = msg->force_full;
+  op->spec->force_delta = msg->force_delta;
 
   // Advance generation values, so that
   // mutations won't interfer with the running operation.