indentation fixes
[oweals/gnunet.git] / src / set / gnunet-service-set.c
index cc172974ef979dda47cdff0925472a49ae88e294..e4e2535af94b84004865a34b4312fe335bb12b04 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013, 2014 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2013, 2014 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -66,6 +66,11 @@ struct Listener
    */
   struct GNUNET_HashCode app_id;
 
+  /**
+   * The port we are listening on with CADET.
+   */
+  struct GNUNET_CADET_Port *open_port;
+
   /**
    * The type of the operation.
    */
@@ -229,6 +234,7 @@ listener_destroy (struct Listener *listener)
     GNUNET_MQ_destroy (listener->client_mq);
     listener->client_mq = NULL;
   }
+  GNUNET_CADET_close_port (listener->open_port);
   GNUNET_CONTAINER_DLL_remove (listeners_head,
                                listeners_tail,
                                listener);
@@ -320,7 +326,8 @@ collect_generation_garbage (struct Set *set)
                                          &gc);
 }
 
-int
+
+static int
 is_excluded_generation (unsigned int generation,
                         struct GenerationRange *excluded,
                         unsigned int excluded_size)
@@ -337,7 +344,7 @@ is_excluded_generation (unsigned int generation,
 }
 
 
-int
+static int
 is_element_of_generation (struct ElementEntry *ee,
                           unsigned int query_generation,
                           struct GenerationRange *excluded,
@@ -611,8 +618,8 @@ static void
 handle_client_disconnect (void *cls,
                           struct GNUNET_SERVER_Client *client)
 {
-  struct Set *set;
   struct Listener *listener;
+  struct Set *set;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "client disconnected, cleaning up\n");
@@ -674,29 +681,6 @@ incoming_destroy (struct Operation *incoming)
 }
 
 
-/**
- * Find a listener that is interested in the given operation type
- * and application id.
- *
- * @param op operation type to look for
- * @param app_id application id to look for
- * @return a matching listener, or NULL if no listener matches the
- *         given operation and application id
- */
-static struct Listener *
-listener_get_by_target (enum GNUNET_SET_OperationType op,
-                        const struct GNUNET_HashCode *app_id)
-{
-  struct Listener *listener;
-
-  for (listener = listeners_head; NULL != listener; listener = listener->next)
-    if ( (listener->operation == op) &&
-         (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) )
-      return listener;
-  return NULL;
-}
-
-
 /**
  * Suggest the given request to the listener. The listening client can
  * then accept or reject the remote request.
@@ -729,7 +713,8 @@ incoming_suggest (struct Operation *incoming,
               incoming->suggest_id);
   cmsg->accept_id = htonl (incoming->suggest_id);
   cmsg->peer_id = incoming->spec->peer;
-  GNUNET_MQ_send (listener->client_mq, mqm);
+  GNUNET_MQ_send (listener->client_mq,
+                  mqm);
 }
 
 
@@ -755,7 +740,7 @@ handle_incoming_msg (struct Operation *op,
                      const struct GNUNET_MessageHeader *mh)
 {
   const struct OperationRequestMessage *msg;
-  struct Listener *listener;
+  struct Listener *listener = op->listener;
   struct OperationSpecification *spec;
   const struct GNUNET_MessageHeader *nested_context;
 
@@ -787,29 +772,20 @@ handle_incoming_msg (struct Operation *op,
   if (NULL != nested_context)
     spec->context_msg = GNUNET_copy_message (nested_context);
   spec->operation = ntohl (msg->operation);
-  spec->app_id = msg->app_id;
+  spec->app_id = listener->app_id;
   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
                                          UINT32_MAX);
   spec->peer = op->peer;
   spec->remote_element_count = ntohl (msg->element_count);
   op->spec = spec;
 
-  listener = listener_get_by_target (ntohl (msg->operation),
-                                     &msg->app_id);
-  if (NULL == listener)
-  {
-    GNUNET_break (NULL != op->timeout_task);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "No matching listener for incoming request (op %u, app %s), waiting with timeout\n",
-                ntohl (msg->operation),
-                GNUNET_h2s (&msg->app_id));
-    return GNUNET_OK;
-  }
+  listener = op->listener;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received P2P operation request (op %u, app %s) for active listener\n",
+              "Received P2P operation request (op %u, port %s) for active listener\n",
               ntohl (msg->operation),
-              GNUNET_h2s (&msg->app_id));
-  incoming_suggest (op, listener);
+              GNUNET_h2s (&listener->app_id));
+  incoming_suggest (op,
+                    listener);
   return GNUNET_OK;
 }
 
@@ -827,12 +803,13 @@ execute_add (struct Set *set,
 
   msg = (const struct GNUNET_SET_ElementMessage *) m;
   el.size = ntohs (m->size) - sizeof *msg;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Client inserts element of size %u\n",
-              el.size);
   el.data = &msg[1];
   el.element_type = ntohs (msg->element_type);
   GNUNET_SET_element_hash (&el, &hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Client inserts element %s of size %u\n",
+              GNUNET_h2s (&hash),
+              el.size);
 
   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
                                           &hash);
@@ -841,7 +818,7 @@ execute_add (struct Set *set,
   {
     ee = GNUNET_malloc (el.size + sizeof *ee);
     ee->element.size = el.size;
-    memcpy (&ee[1],
+    GNUNET_memcpy (&ee[1],
             el.data,
             el.size);
     ee->element.data = &ee[1];
@@ -974,7 +951,7 @@ again:
     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
     set->iter = NULL;
     set->iteration_id++;
-    
+
     GNUNET_assert (set->content->iterator_count > 0);
     set->content->iterator_count -= 1;
 
@@ -1011,7 +988,7 @@ again:
     ev = GNUNET_MQ_msg_extra (msg,
                               ee->element.size,
                               GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
-    memcpy (&msg[1],
+    GNUNET_memcpy (&msg[1],
             ee->element.data,
             ee->element.size);
     msg->element_type = htons (ee->element.element_type);
@@ -1112,6 +1089,13 @@ handle_client_create_set (void *cls,
   }
   set->operation = ntohl (msg->operation);
   set->state = set->vt->create ();
+  if (NULL == set->state)
+  {
+    /* initialization failed (i.e. out of memory) */
+    GNUNET_free (set);
+    GNUNET_SERVER_client_disconnect (client);
+    return;
+  }
   set->content = GNUNET_new (struct SetContent);
   set->content->refcount = 1;
   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
@@ -1125,6 +1109,100 @@ handle_client_create_set (void *cls,
 }
 
 
+/**
+ * Timeout happens iff:
+ *  - we suggested an operation to our listener,
+ *    but did not receive a response in time
+ *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+ *
+ * @param cls channel context
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+incoming_timeout_cb (void *cls)
+{
+  struct Operation *incoming = cls;
+
+  incoming->timeout_task = NULL;
+  GNUNET_assert (GNUNET_YES == incoming->is_incoming);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Remote peer's incoming request timed out\n");
+  incoming_destroy (incoming);
+}
+
+
+/**
+ * Terminates an incoming operation in case we have not yet received an
+ * operation request. Called by the channel destruction handler.
+ *
+ * @param op the channel context
+ */
+static void
+handle_incoming_disconnect (struct Operation *op)
+{
+  GNUNET_assert (GNUNET_YES == op->is_incoming);
+  /* channel is already dead, incoming_destroy must not
+   * destroy it ... */
+  op->channel = NULL;
+  incoming_destroy (op);
+  op->vt = NULL;
+}
+
+
+/**
+ * Method called whenever another peer has added us to a channel the
+ * other peer initiated.  Only called (once) upon reception of data
+ * from a channel we listen on.
+ *
+ * The channel context represents the operation itself and gets added
+ * to a DLL, from where it gets looked up when our local listener
+ * client responds to a proposed/suggested operation or connects and
+ * associates with this operation.
+ *
+ * @param cls closure
+ * @param channel new handle to the channel
+ * @param initiator peer that started the channel
+ * @param port Port this channel is for.
+ * @param options Unused.
+ * @return initial channel context for the channel
+ *         returns NULL on error
+ */
+static void *
+channel_new_cb (void *cls,
+                struct GNUNET_CADET_Channel *channel,
+                const struct GNUNET_PeerIdentity *initiator,
+                const struct GNUNET_HashCode *port,
+                enum GNUNET_CADET_ChannelOption options)
+{
+  static const struct SetVT incoming_vt = {
+    .msg_handler = &handle_incoming_msg,
+    .peer_disconnect = &handle_incoming_disconnect
+  };
+  struct Listener *listener = cls;
+  struct Operation *incoming;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "New incoming channel\n");
+  incoming = GNUNET_new (struct Operation);
+  incoming->listener = listener;
+  incoming->is_incoming = GNUNET_YES;
+  incoming->peer = *initiator;
+  incoming->channel = channel;
+  incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
+  incoming->vt = &incoming_vt;
+  incoming->timeout_task
+    = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
+                                    &incoming_timeout_cb,
+                                    incoming);
+  GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
+                                    incoming_tail,
+                                    incoming);
+  // incoming_suggest (incoming,
+  //                  listener);
+  return incoming;
+}
+
+
 /**
  * Called when a client wants to create a new listener.
  *
@@ -1157,8 +1235,12 @@ handle_client_listen (void *cls,
   GNUNET_CONTAINER_DLL_insert_tail (listeners_head,
                                     listeners_tail,
                                     listener);
+  listener->open_port = GNUNET_CADET_open_port (cadet,
+                                                &msg->app_id,
+                                                &channel_new_cb,
+                                                listener);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "New listener created (op %u, app %s)\n",
+              "New listener created (op %u, port %s)\n",
               listener->operation,
               GNUNET_h2s (&listener->app_id));
 
@@ -1179,7 +1261,8 @@ handle_client_listen (void *cls,
     incoming_suggest (op,
                       listener);
   }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVER_receive_done (client,
+                              GNUNET_OK);
 }
 
 
@@ -1297,6 +1380,7 @@ advance_generation (struct Set *set)
                        r);
 }
 
+
 /**
  * Called when a client wants to initiate a set operation with another
  * peer.  Initiates the CADET connection to the listener and sends the
@@ -1347,10 +1431,13 @@ handle_client_evaluate (void *cls,
   GNUNET_CONTAINER_DLL_insert (set->ops_head,
                                set->ops_tail,
                                op);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating new CADET channel to port %s\n",
+              GNUNET_h2s (&msg->app_id));
   op->channel = GNUNET_CADET_channel_create (cadet,
                                              op,
                                              &msg->target_peer,
-                                             GNUNET_APPLICATION_TYPE_SET,
+                                             &msg->app_id,
                                              GNUNET_CADET_OPTION_RELIABLE);
   op->mq = GNUNET_CADET_mq_create (op->channel);
   set->vt->evaluate (op,
@@ -1496,7 +1583,7 @@ handle_client_copy_lazy_connect (void *cls,
     {
       found = GNUNET_YES;
       break;
-    } 
+    }
   }
 
   if (GNUNET_NO == found)
@@ -1526,7 +1613,8 @@ handle_client_copy_lazy_connect (void *cls,
     return;
   }
 
-  if (NULL == set->vt->copy_state) {
+  if (NULL == set->vt->copy_state)
+  {
     /* Lazy copy not supported for this set operation */
     GNUNET_break (0);
     GNUNET_free (set);
@@ -1702,11 +1790,9 @@ handle_client_accept (void *cls,
  * Called to clean up, after a shutdown has been requested.
  *
  * @param cls closure
- * @param tc context information (why was this task triggered now)
  */
 static void
-shutdown_task (void *cls,
-               const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls)
 {
   while (NULL != incoming_head)
     incoming_destroy (incoming_head);
@@ -1728,113 +1814,13 @@ shutdown_task (void *cls,
 }
 
 
-/**
- * Timeout happens iff:
- *  - we suggested an operation to our listener,
- *    but did not receive a response in time
- *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
- *  - shutdown (obviously)
- *
- * @param cls channel context
- * @param tc context information (why was this task triggered now)
- */
-static void
-incoming_timeout_cb (void *cls,
-                     const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Operation *incoming = cls;
-
-  incoming->timeout_task = NULL;
-  GNUNET_assert (GNUNET_YES == incoming->is_incoming);
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Remote peer's incoming request timed out\n");
-  incoming_destroy (incoming);
-}
-
-
-/**
- * Terminates an incoming operation in case we have not yet received an
- * operation request. Called by the channel destruction handler.
- *
- * @param op the channel context
- */
-static void
-handle_incoming_disconnect (struct Operation *op)
-{
-  GNUNET_assert (GNUNET_YES == op->is_incoming);
-  /* channel is already dead, incoming_destroy must not
-   * destroy it ... */
-  op->channel = NULL;
-  incoming_destroy (op);
-  op->vt = NULL;
-}
-
-
-/**
- * Method called whenever another peer has added us to a channel the
- * other peer initiated.  Only called (once) upon reception of data
- * with a message type which was subscribed to in
- * GNUNET_CADET_connect().
- *
- * The channel context represents the operation itself and gets added to a DLL,
- * from where it gets looked up when our local listener client responds
- * to a proposed/suggested operation or connects and associates with this operation.
- *
- * @param cls closure
- * @param channel new handle to the channel
- * @param initiator peer that started the channel
- * @param port Port this channel is for.
- * @param options Unused.
- * @return initial channel context for the channel
- *         returns NULL on error
- */
-static void *
-channel_new_cb (void *cls,
-                struct GNUNET_CADET_Channel *channel,
-                const struct GNUNET_PeerIdentity *initiator,
-                uint32_t port,
-                enum GNUNET_CADET_ChannelOption options)
-{
-  static const struct SetVT incoming_vt = {
-    .msg_handler = &handle_incoming_msg,
-    .peer_disconnect = &handle_incoming_disconnect
-  };
-  struct Operation *incoming;
-
-  if (GNUNET_APPLICATION_TYPE_SET != port)
-  {
-    GNUNET_break (0);
-    GNUNET_CADET_channel_destroy (channel);
-    return NULL;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "New incoming channel\n");
-  incoming = GNUNET_new (struct Operation);
-  incoming->is_incoming = GNUNET_YES;
-  incoming->peer = *initiator;
-  incoming->channel = channel;
-  incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
-  incoming->vt = &incoming_vt;
-  incoming->timeout_task
-    = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
-                                    &incoming_timeout_cb,
-                                    incoming);
-  GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
-                                    incoming_tail,
-                                    incoming);
-  return incoming;
-}
-
-
 /**
  * Function called whenever a channel is destroyed.  Should clean up
  * any associated state.  It must NOT call
  * GNUNET_CADET_channel_destroy() on the channel.
  *
  * The peer_disconnect function is part of a a virtual table set initially either
- * when a peer creates a new channel with us (#channel_new_cb()), or once we create
+ * when a peer creates a new channel with us, or once we create
  * a new channel ourselves (evaluate).
  *
  * Once we know the exact type of operation (union/intersection), the vt is
@@ -1879,7 +1865,7 @@ channel_end_cb (void *cls,
  * received via a cadet channel.
  *
  * The msg_handler is a virtual table set in initially either when a peer
- * creates a new channel with us (channel_new_cb), or once we create a new channel
+ * creates a new channel with us, or once we create a new channel
  * ourselves (evaluate).
  *
  * Once we know the exact type of operation (union/intersection), the vt is
@@ -1980,26 +1966,25 @@ run (void *cls,
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0},
     {NULL, 0, 0}
   };
-  static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
 
   configuration = cfg;
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
-                                &shutdown_task, NULL);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
   GNUNET_SERVER_disconnect_notify (server,
-                                   &handle_client_disconnect, NULL);
+                                   &handle_client_disconnect,
+                                   NULL);
   GNUNET_SERVER_add_handlers (server,
                               server_handlers);
   _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
-  cadet = GNUNET_CADET_connect (cfg, NULL,
-                                &channel_new_cb,
+  cadet = GNUNET_CADET_connect (cfg,
+                                NULL,
                                 &channel_end_cb,
-                                cadet_handlers,
-                                cadet_ports);
+                                cadet_handlers);
   if (NULL == cadet)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,