Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / set / gnunet-service-set.c
index 5633561ac4a9f45ff34250ffc20ebdff72be1caf..b0f8b209186c1ee4a89036ddb2f04b06df2638bd 100644 (file)
@@ -223,6 +223,9 @@ listener_destroy (struct Listener *listener)
   {
     struct GNUNET_SERVICE_Client *client = listener->client;
 
+    GNUNET_MQ_destroy (listener->client_mq);
+    listener->client_mq = NULL;
+
     listener->client = NULL;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Disconnecting listener client\n");
@@ -610,42 +613,6 @@ client_connect_cb (void *cls,
 }
 
 
-/**
- * Clean up after a client has disconnected
- *
- * @param cls closure, unused
- * @param client the client to clean up after
- * @param internal_cls our client-specific internal data structure
- */
-static void
-client_disconnect_cb (void *cls,
-                      struct GNUNET_SERVICE_Client *client,
-                      void *internal_cls)
-{
-  struct Listener *listener;
-  struct Set *set;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "client disconnected, cleaning up\n");
-  set = set_get (client);
-  if (NULL != set)
-  {
-    set->client = NULL;
-    set_destroy (set);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Client's set destroyed\n");
-  }
-  listener = listener_get (client);
-  if (NULL != listener)
-  {
-    listener->client = NULL;
-    listener_destroy (listener);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Client's listener destroyed\n");
-  }
-}
-
-
 /**
  * Destroy an incoming request from a remote peer
  *
@@ -680,6 +647,52 @@ incoming_destroy (struct Operation *incoming)
 }
 
 
+/**
+ * Clean up after a client has disconnected
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ * @param internal_cls our client-specific internal data structure
+ */
+static void
+client_disconnect_cb (void *cls,
+                      struct GNUNET_SERVICE_Client *client,
+                      void *internal_cls)
+{
+  struct Set *set;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "client disconnected, cleaning up\n");
+  set = set_get (client);
+  if (NULL != set)
+  {
+    set->client = NULL;
+    set_destroy (set);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client's set destroyed\n");
+  }
+  struct Listener *listener = listener_get (client);
+  struct Operation *op = incoming_head;
+  if (NULL != listener)
+  {
+    /* destroy all incoming operations whose client just
+     * got destroyed */
+    while (NULL != op)
+    {
+      struct Operation *curr = op;
+      op = op->next;
+      if ( (GNUNET_YES == curr->is_incoming) && 
+           (curr->listener == listener) )
+        incoming_destroy (curr);
+    }
+    listener->client = NULL;
+    listener_destroy (listener);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client's listener destroyed\n");
+  }
+}
+
+
 /**
  * Suggest the given request to the listener. The listening client can
  * then accept or reject the remote request.
@@ -781,7 +794,7 @@ handle_incoming_msg (struct Operation *op,
   listener = op->listener;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received P2P operation request (op %u, port %s) for active listener\n",
-              ntohl (msg->operation),
+              (uint32_t) ntohl (msg->operation),
               GNUNET_h2s (&listener->app_id));
   incoming_suggest (op,
                     listener);
@@ -805,16 +818,16 @@ execute_add (struct Set *set,
   el.data = &msg[1];
   el.element_type = ntohs (msg->element_type);
   GNUNET_SET_element_hash (&el, &hash);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Client inserts element %s of size %u\n",
-              GNUNET_h2s (&hash),
-              el.size);
 
   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
                                           &hash);
 
   if (NULL == ee)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client inserts element %s of size %u\n",
+                GNUNET_h2s (&hash),
+                el.size);
     ee = GNUNET_malloc (el.size + sizeof *ee);
     ee->element.size = el.size;
     GNUNET_memcpy (&ee[1],
@@ -834,6 +847,11 @@ execute_add (struct Set *set,
   }
   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client inserted element %s of size %u twice (ignored)\n",
+                GNUNET_h2s (&hash),
+                el.size);
+
     /* same element inserted twice */
     return;
   }
@@ -843,7 +861,9 @@ execute_add (struct Set *set,
       .generation = set->current_generation,
       .added = GNUNET_YES
     };
-    GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+    GNUNET_array_append (ee->mutations,
+                         ee->mutations_size,
+                         mut);
   }
 
   set->vt->add (set->state, ee);
@@ -863,9 +883,6 @@ execute_remove (struct Set *set,
 
   msg = (const struct GNUNET_SET_ElementMessage *) m;
   el.size = ntohs (m->size) - sizeof *msg;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Client removes element of size %u\n",
-              el.size);
   el.data = &msg[1];
   el.element_type = ntohs (msg->element_type);
   GNUNET_SET_element_hash (&el, &hash);
@@ -874,11 +891,17 @@ execute_remove (struct Set *set,
   if (NULL == ee)
   {
     /* Client tried to remove non-existing element. */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client removes non-existing element of size %u\n",
+                el.size);
     return;
   }
   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
   {
     /* Client tried to remove element twice */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client removed element of size %u twice (ignored)\n",
+                el.size);
     return;
   }
   else
@@ -887,7 +910,14 @@ execute_remove (struct Set *set,
       .generation = set->current_generation,
       .added = GNUNET_NO
     };
-    GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client removes element of size %u\n",
+                el.size);
+
+    GNUNET_array_append (ee->mutations,
+                         ee->mutations_size,
+                         mut);
   }
   set->vt->remove (set->state, ee);
 }
@@ -1058,7 +1088,7 @@ handle_client_create_set (void *cls,
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client created new set (operation %u)\n",
-              ntohl (msg->operation));
+              (uint32_t) ntohl (msg->operation));
   if (NULL != set_get (client))
   {
     /* There can only be one set per client */
@@ -1353,6 +1383,14 @@ handle_client_listen (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
                            struct GNUNET_MessageHeader,
                            NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                           struct GNUNET_MessageHeader,
+                           NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
                            struct GNUNET_MessageHeader,
@@ -1361,6 +1399,10 @@ handle_client_listen (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
                            struct GNUNET_MessageHeader,
                            NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
@@ -1376,7 +1418,6 @@ handle_client_listen (void *cls,
     GNUNET_MQ_handler_end ()
   };
   struct Listener *listener;
-  struct Operation *op;
 
   if (NULL != listener_get (client))
   {
@@ -1405,7 +1446,7 @@ handle_client_listen (void *cls,
                                                 &channel_end_cb,
                                                 cadet_handlers);
   /* check for existing incoming requests the listener might be interested in */
-  for (op = incoming_head; NULL != op; op = op->next)
+  for (struct Operation *op = incoming_head; NULL != op; op = op->next)
   {
     if (NULL == op->spec)
       continue; /* no details available yet */
@@ -1505,9 +1546,9 @@ handle_client_mutation (void *cls,
     pm = GNUNET_new (struct PendingMutation);
     pm->mutation_message = GNUNET_copy_message (m);
     pm->set = set;
-    GNUNET_CONTAINER_DLL_insert (set->content->pending_mutations_head,
-                                 set->content->pending_mutations_tail,
-                                 pm);
+    GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
+                                      set->content->pending_mutations_tail,
+                                      pm);
     return;
   }
   execute_mutation (set, m);
@@ -1616,6 +1657,18 @@ handle_client_evaluate (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
                            struct GNUNET_MessageHeader,
                            op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           op),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
@@ -1651,6 +1704,10 @@ handle_client_evaluate (void *cls,
   spec->set = set;
   spec->result_mode = ntohl (msg->result_mode);
   spec->client_request_id = ntohl (msg->request_id);
+  spec->byzantine = msg->byzantine;
+  spec->byzantine_lower_bound = msg->byzantine_lower_bound;
+  spec->force_full = msg->force_full;
+  spec->force_delta = msg->force_delta;
   context = GNUNET_MQ_extract_nested_mh (msg);
   op->spec = spec;
 
@@ -1901,7 +1958,7 @@ handle_client_cancel (void *cls,
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client requested cancel for op %u\n",
-              ntohl (msg->request_id));
+              (uint32_t) ntohl (msg->request_id));
   found = GNUNET_NO;
   for (op = set->ops_head; NULL != op; op = op->next)
   {
@@ -1975,7 +2032,7 @@ handle_client_accept (void *cls,
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client accepting request %u\n",
-              ntohl (msg->accept_reject_id));
+              (uint32_t) ntohl (msg->accept_reject_id));
   GNUNET_assert (GNUNET_YES == op->is_incoming);
   op->is_incoming = GNUNET_NO;
   GNUNET_CONTAINER_DLL_remove (incoming_head,
@@ -1987,6 +2044,10 @@ handle_client_accept (void *cls,
                                op);
   op->spec->client_request_id = ntohl (msg->request_id);
   op->spec->result_mode = ntohl (msg->result_mode);
+  op->spec->byzantine = msg->byzantine;
+  op->spec->byzantine_lower_bound = msg->byzantine_lower_bound;
+  op->spec->force_full = msg->force_full;
+  op->spec->force_delta = msg->force_delta;
 
   // Advance generation values, so that
   // mutations won't interfer with the running operation.