remove legacy core api code (now dead)
[oweals/gnunet.git] / src / set / set_api.c
index d6247501309f65a7617a1759a6a08be49ed8f0f6..7a33b86eaeb8d210ada05c1ec7f1ff0db9d5fa25 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2012-2014 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2012-2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 /**
  * @file set/set_api.c
  * @brief api for the set service
  * @author Florian Dold
+ * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_protocols.h"
-#include "gnunet_client_lib.h"
 #include "gnunet_set_service.h"
 #include "set.h"
 
 
 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
 
+struct SetCopyRequest
+{
+  struct SetCopyRequest *next;
+
+  struct SetCopyRequest *prev;
+
+  void *cls;
+
+  GNUNET_SET_CopyReadyCallback cb;
+};
+
 /**
  * Opaque handle to a set.
  */
 struct GNUNET_SET_Handle
 {
-  /**
-   * Client connected to the set service.
-   */
-  struct GNUNET_CLIENT_Connection *client;
-
   /**
    * Message queue for @e client.
    */
@@ -77,6 +83,27 @@ struct GNUNET_SET_Handle
    * Has the set become invalid (e.g. service died)?
    */
   int invalid;
+
+  /**
+   * Both client and service count the number of iterators
+   * created so far to match replies with iterators.
+   */
+  uint16_t iteration_id;
+
+  /**
+   * Configuration, needed when creating (lazy) copies.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+  /**
+   * Doubly linked list of copy requests.
+   */
+  struct SetCopyRequest *copy_req_head;
+
+  /**
+   * Doubly linked list of copy requests.
+   */
+  struct SetCopyRequest *copy_req_tail;
 };
 
 
@@ -156,10 +183,6 @@ struct GNUNET_SET_OperationHandle
  */
 struct GNUNET_SET_ListenHandle
 {
-  /**
-   * Connection to the service.
-   */
-  struct GNUNET_CLIENT_Connection *client;
 
   /**
    * Message queue for the client.
@@ -197,7 +220,7 @@ struct GNUNET_SET_ListenHandle
   /**
    * Task for reconnecting when the listener fails.
    */
-  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
   /**
    * Operation we listen for.
@@ -206,40 +229,95 @@ struct GNUNET_SET_ListenHandle
 };
 
 
+/* mutual recursion with handle_copy_lazy */
+static struct GNUNET_SET_Handle *
+create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                 enum GNUNET_SET_OperationType op,
+                 const uint32_t *cookie);
+
+
 /**
  * Handle element for iteration over the set.  Notifies the
  * iterator and sends an acknowledgement to the service.
  *
- * @param cls the set
- * @param mh the message
+ * @param cls the `struct GNUNET_SET_Handle *`
+ * @param msg the message
  */
 static void
-handle_iter_element (void *cls,
-                     const struct GNUNET_MessageHeader *mh)
+handle_copy_lazy (void *cls,
+                  const struct GNUNET_SET_CopyLazyResponseMessage *msg)
+{
+  struct GNUNET_SET_Handle *set = cls;
+  struct SetCopyRequest *req;
+  struct GNUNET_SET_Handle *new_set;
+
+  req = set->copy_req_head;
+  if (NULL == req)
+  {
+    /* Service sent us unsolicited lazy copy response */
+    GNUNET_break (0);
+    return;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handling response to lazy copy\n");
+  GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
+                               set->copy_req_tail,
+                               req);
+  // We pass none as operation here, since it doesn't matter when
+  // cloning.
+  new_set = create_internal (set->cfg,
+                            GNUNET_SET_OPERATION_NONE,
+                            &msg->cookie);
+  req->cb (req->cls, new_set);
+  GNUNET_free (req);
+}
+
+
+/**
+ * Check that the given @a msg is well-formed.
+ *
+ * @param cls closure
+ * @param msg message to check
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_iter_element (void *cls,
+                   const struct GNUNET_SET_IterResponseMessage *msg)
+{
+  /* minimum size was already checked, everything else is OK! */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle element for iteration over the set.  Notifies the
+ * iterator and sends an acknowledgement to the service.
+ *
+ * @param cls the `struct GNUNET_SET_Handle *`
+ * @param mh the message
+ */
+ static void
+ handle_iter_element (void *cls,
+                      const struct GNUNET_SET_IterResponseMessage *msg)
 {
   struct GNUNET_SET_Handle *set = cls;
   GNUNET_SET_ElementIterator iter = set->iterator;
   struct GNUNET_SET_Element element;
-  const struct GNUNET_SET_IterResponseMessage *msg;
   struct GNUNET_SET_IterAckMessage *ack_msg;
   struct GNUNET_MQ_Envelope *ev;
   uint16_t msize;
 
-  msize = ntohs (mh->size);
-  if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
+  msize = ntohs (msg->header.size);
+  if (set->iteration_id != ntohs (msg->iteration_id))
   {
-    /* message malformed */
-    GNUNET_break (0);
-    set->iterator = NULL;
-    iter (set->iterator_cls,
-          NULL);
+    /* element from a previous iteration, skip! */
     iter = NULL;
   }
   if (NULL != iter)
   {
-    msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
-    element.element_type = htons (msg->element_type);
+    element.element_type = ntohs (msg->element_type);
     element.data = &msg[1];
     iter (set->iterator_cls,
           &element);
@@ -268,11 +346,28 @@ handle_iter_done (void *cls,
   if (NULL == iter)
     return;
   set->iterator = NULL;
+  set->iteration_id++;
   iter (set->iterator_cls,
         NULL);
 }
 
 
+/**
+ * Check that the given @a msg is well-formed.
+ *
+ * @param cls closure
+ * @param msg message to check
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_result (void *cls,
+             const struct GNUNET_SET_ResultMessage *msg)
+{
+  /* minimum size was already checked, everything else is OK! */
+  return GNUNET_OK;
+}
+
+
 /**
  * Handle result message for a set operation.
  *
@@ -281,17 +376,20 @@ handle_iter_done (void *cls,
  */
 static void
 handle_result (void *cls,
-               const struct GNUNET_MessageHeader *mh)
+               const struct GNUNET_SET_ResultMessage *msg)
 {
   struct GNUNET_SET_Handle *set = cls;
-  const struct GNUNET_SET_ResultMessage *msg;
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_Element e;
   enum GNUNET_SET_Status result_status;
+  int destroy_set;
 
-  msg = (const struct GNUNET_SET_ResultMessage *) mh;
   GNUNET_assert (NULL != set->mq);
   result_status = ntohs (msg->result_status);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got result message with status %d\n",
+       result_status);
+
   oh = GNUNET_MQ_assoc_get (set->mq,
                             ntohl (msg->request_id));
   if (NULL == oh)
@@ -302,79 +400,60 @@ handle_result (void *cls,
                 "Ignoring result from canceled operation\n");
     return;
   }
-  if (GNUNET_SET_STATUS_OK != result_status)
+
+  switch (result_status)
   {
-    /* status is not STATUS_OK => there's no attached element,
-     * and this is the last result message we get */
-    GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
-    GNUNET_CONTAINER_DLL_remove (set->ops_head,
-                                 set->ops_tail,
-                                 oh);
-    if ( (GNUNET_YES == set->destroy_requested) &&
-         (NULL == set->ops_head) )
-      GNUNET_SET_destroy (set);
-    if (NULL != oh->result_cb)
-      oh->result_cb (oh->result_cls,
-                     NULL,
-                     result_status);
-    GNUNET_free (oh);
-    return;
+    case GNUNET_SET_STATUS_OK:
+    case GNUNET_SET_STATUS_ADD_LOCAL:
+    case GNUNET_SET_STATUS_ADD_REMOTE:
+      goto do_element;
+    case GNUNET_SET_STATUS_FAILURE:
+    case GNUNET_SET_STATUS_DONE:
+      goto do_final;
+    case GNUNET_SET_STATUS_HALF_DONE:
+      /* not used anymore */
+      GNUNET_assert (0);
   }
-  e.data = &msg[1];
-  e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
-  e.element_type = msg->element_type;
+
+do_final:
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Treating result as final status\n");
+  GNUNET_MQ_assoc_remove (set->mq,
+                          ntohl (msg->request_id));
+  GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                               set->ops_tail,
+                               oh);
+  /* Need to do this calculation _before_ the result callback,
+     as IF the application still has a valid set handle, it
+     may trigger destruction of the set during the callback. */
+  destroy_set = (GNUNET_YES == set->destroy_requested) &&
+                (NULL == set->ops_head);
   if (NULL != oh->result_cb)
+  {
     oh->result_cb (oh->result_cls,
-                   &e,
+                   NULL,
                    result_status);
-}
-
-
-/**
- * Handle request message for a listen operation
- *
- * @param cls the listen handle
- * @param mh the message
- */
-static void
-handle_request (void *cls,
-                const struct GNUNET_MessageHeader *mh)
-{
-  const struct GNUNET_SET_RequestMessage *msg = (const struct GNUNET_SET_RequestMessage *) mh;
-  struct GNUNET_SET_ListenHandle *lh = cls;
-  struct GNUNET_SET_Request *req;
-  const struct GNUNET_MessageHeader *context_msg;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "processing operation request\n");
-  req = GNUNET_new (struct GNUNET_SET_Request);
-  req->accept_id = ntohl (msg->accept_id);
-  context_msg = GNUNET_MQ_extract_nested_mh (msg);
-  /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
-  lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req);
-
-  /* we got another request => reset the backoff */
-  lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-
-  if (GNUNET_NO == req->accepted)
+  }
+  else
   {
-    struct GNUNET_MQ_Envelope *mqm;
-    struct GNUNET_SET_RejectMessage *rmsg;
-
-    mqm = GNUNET_MQ_msg (rmsg,
-                         GNUNET_MESSAGE_TYPE_SET_REJECT);
-    rmsg->accept_reject_id = msg->accept_id;
-    GNUNET_MQ_send (lh->mq, mqm);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "rejecting request\n");
+         "No callback for final status\n");
   }
-  GNUNET_free (req);
+  if (destroy_set)
+    GNUNET_SET_destroy (set);
+  GNUNET_free (oh);
+  return;
 
+do_element:
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "processed op request from service\n");
-
-  /* the accept-case is handled in GNUNET_SET_accept,
-   * as we have the accept message available there */
+       "Treating result as element\n");
+  e.data = &msg[1];
+  e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
+  e.element_type = ntohs (msg->element_type);
+  if (NULL != oh->result_cb)
+    oh->result_cb (oh->result_cls,
+                   &e,
+                   result_status);
 }
 
 
@@ -449,9 +528,11 @@ handle_client_set_error (void *cls,
                          enum GNUNET_MQ_Error error)
 {
   struct GNUNET_SET_Handle *set = cls;
+  GNUNET_SET_ElementIterator iter = set->iterator;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Handling client set error\n");
+       "Handling client set error %d\n",
+       error);
   while (NULL != set->ops_head)
   {
     if (NULL != set->ops_head->result_cb)
@@ -460,6 +541,11 @@ handle_client_set_error (void *cls,
                                 GNUNET_SET_STATUS_FAILURE);
     set_operation_destroy (set->ops_head);
   }
+  set->iterator = NULL;
+  set->iteration_id++;
+  if (NULL != iter)
+    iter (set->iterator_cls,
+          NULL);
   set->invalid = GNUNET_YES;
   if (GNUNET_YES == set->destroy_requested)
   {
@@ -470,6 +556,69 @@ handle_client_set_error (void *cls,
 }
 
 
+static struct GNUNET_SET_Handle *
+create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                 enum GNUNET_SET_OperationType op,
+                 const uint32_t *cookie)
+{
+  struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
+  struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    GNUNET_MQ_hd_var_size (result,
+                           GNUNET_MESSAGE_TYPE_SET_RESULT,
+                           struct GNUNET_SET_ResultMessage,
+                           set),
+    GNUNET_MQ_hd_var_size (iter_element,
+                           GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
+                           struct GNUNET_SET_IterResponseMessage,
+                           set),
+    GNUNET_MQ_hd_fixed_size (iter_done,
+                             GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
+                             struct GNUNET_MessageHeader,
+                             set),
+    GNUNET_MQ_hd_fixed_size (copy_lazy,
+                             GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
+                             struct GNUNET_SET_CopyLazyResponseMessage,
+                             set),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SET_CreateMessage *create_msg;
+  struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
+
+  set->cfg = cfg;
+  set->mq = GNUNET_CLIENT_connecT (cfg,
+                                   "set",
+                                   mq_handlers,
+                                   &handle_client_set_error,
+                                   set);
+  if (NULL == set->mq)
+  {
+    GNUNET_free (set);
+    return NULL;
+  }
+  if (NULL == cookie)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Creating new set (operation %u)\n",
+         op);
+    mqm = GNUNET_MQ_msg (create_msg,
+                         GNUNET_MESSAGE_TYPE_SET_CREATE);
+    create_msg->operation = htonl (op);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Creating new set (lazy copy)\n",
+         op);
+    mqm = GNUNET_MQ_msg (copy_msg,
+                         GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
+    copy_msg->cookie = *cookie;
+  }
+  GNUNET_MQ_send (set->mq, mqm);
+  return set;
+}
+
+
 /**
  * Create an empty set, supporting the specified operation.
  *
@@ -485,34 +634,7 @@ struct GNUNET_SET_Handle *
 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
                    enum GNUNET_SET_OperationType op)
 {
-  static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    { &handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
-    { &handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0},
-    { &handle_iter_done,
-      GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
-      sizeof (struct GNUNET_MessageHeader) },
-    GNUNET_MQ_HANDLERS_END
-  };
-  struct GNUNET_SET_Handle *set;
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_CreateMessage *msg;
-
-  set = GNUNET_new (struct GNUNET_SET_Handle);
-  set->client = GNUNET_CLIENT_connect ("set", cfg);
-  if (NULL == set->client)
-  {
-    GNUNET_free (set);
-    return NULL;
-  }
-  set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
-                                                   mq_handlers,
-                                                   &handle_client_set_error, set);
-  GNUNET_assert (NULL != set->mq);
-  mqm = GNUNET_MQ_msg (msg,
-                       GNUNET_MESSAGE_TYPE_SET_CREATE);
-  msg->operation = htonl (op);
-  GNUNET_MQ_send (set->mq, mqm);
-  return set;
+  return create_internal (cfg, op, NULL);
 }
 
 
@@ -546,10 +668,10 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
   }
   mqm = GNUNET_MQ_msg_extra (msg, element->size,
                              GNUNET_MESSAGE_TYPE_SET_ADD);
-  msg->element_type = element->element_type;
-  memcpy (&msg[1],
-          element->data,
-          element->size);
+  msg->element_type = htons (element->element_type);
+  GNUNET_memcpy (&msg[1],
+                 element->data,
+                 element->size);
   GNUNET_MQ_notify_sent (mqm,
                          cont, cont_cls);
   GNUNET_MQ_send (set->mq, mqm);
@@ -588,10 +710,10 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
   mqm = GNUNET_MQ_msg_extra (msg,
                              element->size,
                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
-  msg->element_type = element->element_type;
-  memcpy (&msg[1],
-          element->data,
-          element->size);
+  msg->element_type = htons (element->element_type);
+  GNUNET_memcpy (&msg[1],
+                 element->data,
+                 element->size);
   GNUNET_MQ_notify_sent (mqm,
                          cont, cont_cls);
   GNUNET_MQ_send (set->mq, mqm);
@@ -608,6 +730,10 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
 void
 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
 {
+  /* destroying set while iterator is active is currently
+     not supported; we should expand the API to allow
+     clients to explicitly cancel the iteration! */
+  GNUNET_assert (NULL == set->iterator);
   if (NULL != set->ops_head)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -617,11 +743,6 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Really destroying set\n");
-  if (NULL != set->client)
-  {
-    GNUNET_CLIENT_disconnect (set->client);
-    set->client = NULL;
-  }
   if (NULL != set->mq)
   {
     GNUNET_MQ_destroy (set->mq);
@@ -677,11 +798,74 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
  * Connect to the set service in order to listen for requests.
  *
  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
- * @param tc task context if invoked as a task, NULL otherwise
  */
 static void
-listen_connect (void *cls,
-                const struct GNUNET_SCHEDULER_TaskContext *tc);
+listen_connect (void *cls);
+
+
+/**
+ * Check validity of request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param msg the message
+ * @return #GNUNET_OK if the message is well-formed
+ */
+static int
+check_request (void *cls,
+              const struct GNUNET_SET_RequestMessage *msg)
+{
+  const struct GNUNET_MessageHeader *context_msg;
+
+  if (ntohs (msg->header.size) == sizeof (*msg))
+    return GNUNET_OK; /* no context message is OK */
+  context_msg = GNUNET_MQ_extract_nested_mh (msg);
+  if (NULL == context_msg)
+  {
+    /* malformed context message is NOT ok */
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param msg the message
+ */
+static void
+handle_request (void *cls,
+                const struct GNUNET_SET_RequestMessage *msg)
+{
+  struct GNUNET_SET_ListenHandle *lh = cls;
+  struct GNUNET_SET_Request req;
+  const struct GNUNET_MessageHeader *context_msg;
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SET_RejectMessage *rmsg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Processing incoming operation request\n");
+  /* we got another valid request => reset the backoff */
+  lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  req.accept_id = ntohl (msg->accept_id);
+  req.accepted = GNUNET_NO;
+  context_msg = GNUNET_MQ_extract_nested_mh (msg);
+  /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
+  lh->listen_cb (lh->listen_cls,
+                 &msg->peer_id,
+                 context_msg,
+                 &req);
+  if (GNUNET_YES == req.accepted)
+    return; /* the accept-case is handled in #GNUNET_SET_accept() */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Rejecting request\n");
+  mqm = GNUNET_MQ_msg (rmsg,
+                       GNUNET_MESSAGE_TYPE_SET_REJECT);
+  rmsg->accept_reject_id = msg->accept_id;
+  GNUNET_MQ_send (lh->mq, mqm);
+}
 
 
 /**
@@ -700,12 +884,11 @@ handle_client_listener_error (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Listener broke down (%d), re-connecting\n",
        (int) error);
-  GNUNET_CLIENT_disconnect (lh->client);
-  lh->client = NULL;
   GNUNET_MQ_destroy (lh->mq);
   lh->mq = NULL;
   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
-                                                     &listen_connect, lh);
+                                                     &listen_connect,
+                                                    lh);
   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
 }
 
@@ -714,40 +897,35 @@ handle_client_listener_error (void *cls,
  * Connect to the set service in order to listen for requests.
  *
  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
- * @param tc task context if invoked as a task, NULL otherwise
  */
 static void
-listen_connect (void *cls,
-                const struct GNUNET_SCHEDULER_TaskContext *tc)
+listen_connect (void *cls)
 {
-  static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
-    GNUNET_MQ_HANDLERS_END
-  };
   struct GNUNET_SET_ListenHandle *lh = cls;
+  struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    GNUNET_MQ_hd_var_size (request,
+                           GNUNET_MESSAGE_TYPE_SET_REQUEST,
+                           struct GNUNET_SET_RequestMessage,
+                           lh),
+    GNUNET_MQ_handler_end ()
+  };
   struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_ListenMessage *msg;
 
-  if ( (NULL != tc) &&
-       (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Listener not reconnecting due to shutdown\n");
-    return;
-  }
-  lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-
-  GNUNET_assert (NULL == lh->client);
-  lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
-  if (NULL == lh->client)
-    return;
+  lh->reconnect_task = NULL;
   GNUNET_assert (NULL == lh->mq);
-  lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
-                                                  &handle_client_listener_error, lh);
+  lh->mq = GNUNET_CLIENT_connecT (lh->cfg,
+                                  "set",
+                                  mq_handlers,
+                                  &handle_client_listener_error,
+                                  lh);
+  if (NULL == lh->mq)
+    return;
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
   msg->operation = htonl (lh->operation);
   msg->app_id = lh->app_id;
-  GNUNET_MQ_send (lh->mq, mqm);
+  GNUNET_MQ_send (lh->mq,
+                  mqm);
 }
 
 
@@ -779,8 +957,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
   lh->operation = operation;
   lh->app_id = *app_id;
   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-  listen_connect (lh, NULL);
-  if (NULL == lh->client)
+  listen_connect (lh);
+  if (NULL == lh->mq)
   {
     GNUNET_free (lh);
     return NULL;
@@ -804,15 +982,10 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
     GNUNET_MQ_destroy (lh->mq);
     lh->mq = NULL;
   }
-  if (NULL != lh->client)
-  {
-    GNUNET_CLIENT_disconnect (lh->client);
-    lh->client = NULL;
-  }
-  if (GNUNET_SCHEDULER_NO_TASK != lh->reconnect_task)
+  if (NULL != lh->reconnect_task)
   {
     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
-    lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+    lh->reconnect_task = NULL;
   }
   GNUNET_free (lh);
 }
@@ -873,7 +1046,13 @@ int
 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
                    struct GNUNET_SET_Handle *set)
 {
-  GNUNET_assert (NULL == oh->set);
+  if (NULL != oh->set)
+  {
+    /* Some other set was already commited for this
+     * operation, there is a logic bug in the client of this API */
+    GNUNET_break (0);
+    return GNUNET_OK;
+  }
   if (GNUNET_YES == set->invalid)
     return GNUNET_SYSERR;
   GNUNET_assert (NULL != oh->conclude_mqm);
@@ -923,4 +1102,69 @@ GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
   return GNUNET_YES;
 }
 
+
+void
+GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
+                      GNUNET_SET_CopyReadyCallback cb,
+                      void *cls)
+{
+  struct GNUNET_MQ_Envelope *ev;
+  struct SetCopyRequest *req;
+
+  ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
+  GNUNET_MQ_send (set->mq, ev);
+
+  req = GNUNET_new (struct SetCopyRequest);
+  req->cb = cb;
+  req->cls = cls;
+  GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
+                               set->copy_req_tail,
+                               req);
+}
+
+
+/**
+ * Create a copy of an element.  The copy
+ * must be GNUNET_free-d by the caller.
+ *
+ * @param element the element to copy
+ * @return the copied element
+ */
+struct GNUNET_SET_Element *
+GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
+{
+  struct GNUNET_SET_Element *copy;
+
+  copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
+  copy->size = element->size;
+  copy->element_type = element->element_type;
+  copy->data = &copy[1];
+  GNUNET_memcpy (&copy[1],
+                 element->data,
+                 copy->size);
+  return copy;
+}
+
+
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param[out] ret_hash a pointer to where the hash of @a element
+ *        should be stored
+ */
+void
+GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
+                        struct GNUNET_HashCode *ret_hash)
+{
+  struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
+
+  /* It's not guaranteed that the element data is always after the element header,
+     so we need to hash the chunks separately. */
+  GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
+  GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
+  GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
+  GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
+}
+
 /* end of set_api.c */