-fixing misc issues and bugs, including better termination logic for intersection...
[oweals/gnunet.git] / src / set / set_api.c
index 267fe3fc646e187fe8966f709e5d41a6f55d1a7e..ac0be21feafe963b9a2607f8d31ba0e18127432b 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+     (C) 2012-2014 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
      Boston, MA 02111-1307, USA.
 */
-
 /**
  * @file set/set_api.c
  * @brief api for the set service
  * @author Florian Dold
+ * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
@@ -44,7 +44,7 @@ struct GNUNET_SET_Handle
   struct GNUNET_CLIENT_Connection *client;
 
   /**
-   * Message queue for 'client'.
+   * Message queue for @e client.
    */
   struct GNUNET_MQ_Handle *mq;
 
@@ -58,6 +58,17 @@ struct GNUNET_SET_Handle
    */
   struct GNUNET_SET_OperationHandle *ops_tail;
 
+  /**
+   * Callback for the current iteration over the set,
+   * NULL if no iterator is active.
+   */
+  GNUNET_SET_ElementIterator iterator;
+
+  /**
+   * Closure for @e iterator
+   */
+  void *iterator_cls;
+
   /**
    * Should the set be destroyed once all operations are gone?
    */
@@ -69,20 +80,15 @@ struct GNUNET_SET_Handle
   int invalid;
 
   /**
-   * Callback for the current iteration over the set,
-   * NULL if no iterator is active.
+   * Both client and service count the number of iterators
+   * created so far to match replies with iterators.
    */
-  GNUNET_SET_ElementIterator iterator;
-
-  /**
-   * Closure for 'iterator'
-   */
-  void *iterator_cls;
+  uint16_t iteration_id;
 };
 
 
 /**
- * Opaque handle to a set operation request from another peer.
+ * Handle for a set operation request from another peer.
  */
 struct GNUNET_SET_Request
 {
@@ -94,15 +100,14 @@ struct GNUNET_SET_Request
 
   /**
    * Has the request been accepted already?
-   * GNUNET_YES/GNUNET_NO
+   * #GNUNET_YES/#GNUNET_NO
    */
   int accepted;
 };
 
 
 /**
- * Handle to an operation.
- * Only known to the service after commiting
+ * Handle to an operation.  Only known to the service after committing
  * the handle with a set.
  */
 struct GNUNET_SET_OperationHandle
@@ -114,7 +119,7 @@ struct GNUNET_SET_OperationHandle
   GNUNET_SET_ResultIterator result_cb;
 
   /**
-   * Closure for result_cb.
+   * Closure for @e result_cb.
    */
   void *result_cls;
 
@@ -124,11 +129,6 @@ struct GNUNET_SET_OperationHandle
    */
   struct GNUNET_SET_Handle *set;
 
-  /**
-   * Request ID to identify the operation within the set.
-   */
-  uint32_t request_id;
-
   /**
    * Message sent to the server on calling conclude,
    * NULL if conclude has been called.
@@ -150,6 +150,11 @@ struct GNUNET_SET_OperationHandle
    * Handles are kept in a linked list.
    */
   struct GNUNET_SET_OperationHandle *next;
+
+  /**
+   * Request ID to identify the operation within the set.
+   */
+  uint32_t request_id;
 };
 
 
@@ -182,15 +187,10 @@ struct GNUNET_SET_ListenHandle
   GNUNET_SET_ListenCallback listen_cb;
 
   /**
-   * Closure for listen_cb.
+   * Closure for @e listen_cb.
    */
   void *listen_cls;
 
-  /**
-   * Operation we listen for.
-   */
-  enum GNUNET_SET_OperationType operation;
-
   /**
    * Application ID we listen for.
    */
@@ -200,59 +200,90 @@ struct GNUNET_SET_ListenHandle
    * Time to wait until we try to reconnect on failure.
    */
   struct GNUNET_TIME_Relative reconnect_backoff;
-};
 
+  /**
+   * Task for reconnecting when the listener fails.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
 
-/* forward declaration */
-static void
-listen_connect (void *cls,
-                const struct GNUNET_SCHEDULER_TaskContext *tc);
+  /**
+   * Operation we listen for.
+   */
+  enum GNUNET_SET_OperationType operation;
+};
 
 
 /**
- * Handle element for iteration over the set.
+ * Handle element for iteration over the set.  Notifies the
+ * iterator and sends an acknowledgement to the service.
  *
- * @param cls the set
+ * @param cls the `struct GNUNET_SET_Handle *`
  * @param mh the message
  */
 static void
-handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_iter_element (void *cls,
+                     const struct GNUNET_MessageHeader *mh)
 {
   struct GNUNET_SET_Handle *set = cls;
+  GNUNET_SET_ElementIterator iter = set->iterator;
   struct GNUNET_SET_Element element;
-  const struct GNUNET_SET_IterResponseMessage *msg =
-    (const struct GNUNET_SET_IterResponseMessage *) mh;
+  const struct GNUNET_SET_IterResponseMessage *msg;
   struct GNUNET_SET_IterAckMessage *ack_msg;
   struct GNUNET_MQ_Envelope *ev;
+  uint16_t msize;
 
-  if (NULL == set->iterator)
-    return;
-
-  element.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_IterResponseMessage);
-  element.type = htons (msg->element_type);
-  element.data = &msg[1];
-  set->iterator (set->iterator_cls, &element);
-  ev = GNUNET_MQ_msg (ack_msg, GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
-  ack_msg->send_more = htonl (1);
+  msize = ntohs (mh->size);
+  if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
+  {
+    /* message malformed */
+    GNUNET_break (0);
+    set->iterator = NULL;
+    set->iteration_id++;
+    iter (set->iterator_cls,
+          NULL);
+    iter = NULL;
+  }
+  msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
+  if (set->iteration_id != ntohs (msg->iteration_id))
+  {
+    /* element from a previous iteration, skip! */
+    iter = NULL;
+  }
+  if (NULL != iter)
+  {
+    element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
+    element.element_type = htons (msg->element_type);
+    element.data = &msg[1];
+    iter (set->iterator_cls,
+          &element);
+  }
+  ev = GNUNET_MQ_msg (ack_msg,
+                      GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
+  ack_msg->send_more = htonl ((NULL != iter));
   GNUNET_MQ_send (set->mq, ev);
 }
 
 
 /**
- * Handle element for iteration over the set.
+ * Handle message signalling conclusion of iteration over the set.
+ * Notifies the iterator that we are done.
  *
  * @param cls the set
  * @param mh the message
  */
 static void
-handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_iter_done (void *cls,
+                  const struct GNUNET_MessageHeader *mh)
 {
   struct GNUNET_SET_Handle *set = cls;
+  GNUNET_SET_ElementIterator iter = set->iterator;
 
-  if (NULL == set->iterator)
+  if (NULL == iter)
     return;
-
-  set->iterator (set->iterator_cls, NULL);
+  set->iterator = NULL;
+  set->iteration_id++;
+  iter (set->iterator_cls,
+        NULL);
 }
 
 
@@ -263,119 +294,145 @@ handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh)
  * @param mh the message
  */
 static void
-handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_result (void *cls,
+               const struct GNUNET_MessageHeader *mh)
 {
-  const struct GNUNET_SET_ResultMessage *msg;
   struct GNUNET_SET_Handle *set = cls;
+  const struct GNUNET_SET_ResultMessage *msg;
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_Element e;
   enum GNUNET_SET_Status result_status;
 
   msg = (const struct GNUNET_SET_ResultMessage *) mh;
-  GNUNET_assert (NULL != set);
   GNUNET_assert (NULL != set->mq);
-
   result_status = ntohs (msg->result_status);
-
-  oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
-  GNUNET_assert (NULL != oh);
-  /* status is not STATUS_OK => there's no attached element,
-   * and this is the last result message we get */
+  oh = GNUNET_MQ_assoc_get (set->mq,
+                            ntohl (msg->request_id));
+  if (NULL == oh)
+  {
+    /* 'oh' can be NULL if we canceled the operation, but the service
+       did not get the cancel message yet. */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Ignoring result from canceled operation\n");
+    return;
+  }
   if (GNUNET_SET_STATUS_OK != result_status)
   {
+    /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
+     * and this is the last result message we get */
     GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
-    GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
-    if (GNUNET_YES == oh->set->destroy_requested)
-      GNUNET_SET_destroy (oh->set);
+    GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                                 set->ops_tail,
+                                 oh);
+    if ( (GNUNET_YES == set->destroy_requested) &&
+         (NULL == set->ops_head) )
+      GNUNET_SET_destroy (set);
     if (NULL != oh->result_cb)
-      oh->result_cb (oh->result_cls, NULL, result_status);
+      oh->result_cb (oh->result_cls,
+                     NULL,
+                     result_status);
     GNUNET_free (oh);
     return;
   }
-
   e.data = &msg[1];
   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
-  e.type = msg->element_type;
+  e.element_type = msg->element_type;
   if (NULL != oh->result_cb)
-    oh->result_cb (oh->result_cls, &e, result_status);
+    oh->result_cb (oh->result_cls,
+                   &e,
+                   result_status);
 }
 
 
 /**
- * Handle request message for a listen operation
+ * Destroy the given set operation.
  *
- * @param cls the listen handle
- * @param mh the message
+ * @param oh set operation to destroy
  */
 static void
-handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
+set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
 {
-  const struct GNUNET_SET_RequestMessage *msg = (const struct GNUNET_SET_RequestMessage *) mh;
-  struct GNUNET_SET_ListenHandle *lh = cls;
-  struct GNUNET_SET_Request *req;
-  struct GNUNET_MessageHeader *context_msg;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "processing operation request\n");
-  req = GNUNET_new (struct GNUNET_SET_Request);
-  req->accept_id = ntohl (msg->accept_id);
-  context_msg = GNUNET_MQ_extract_nested_mh (msg);
-  /* calling GNUNET_SET_accept in the listen cb will set req->accepted */
-  lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req);
+  struct GNUNET_SET_Handle *set = oh->set;
+  struct GNUNET_SET_OperationHandle *h_assoc;
 
-  /* we got another request => reset the backoff */
-  lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
-
-  if (GNUNET_NO == req->accepted)
+  if (NULL != oh->conclude_mqm)
+    GNUNET_MQ_discard (oh->conclude_mqm);
+  /* is the operation already commited? */
+  if (NULL != set)
   {
-    struct GNUNET_MQ_Envelope *mqm;
-    struct GNUNET_SET_AcceptRejectMessage *amsg;
-
-    mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
-    /* no request id, as we refused */
-    amsg->request_id = htonl (0);
-    amsg->accept_reject_id = msg->accept_id;
-    GNUNET_MQ_send (lh->mq, mqm);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n");
+    GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                                 set->ops_tail,
+                                 oh);
+    h_assoc = GNUNET_MQ_assoc_remove (set->mq,
+                                      oh->request_id);
+    GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
   }
-  GNUNET_free (req);
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n");
-
-  /* the accept-case is handled in GNUNET_SET_accept,
-   * as we have the accept message available there */
+  GNUNET_free (oh);
 }
 
 
-static void
-handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error)
+/**
+ * Cancel the given set operation.  We need to send an explicit cancel
+ * message, as all operations one one set communicate using one
+ * handle.
+ *
+ * @param oh set operation to cancel
+ */
+void
+GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
 {
-  struct GNUNET_SET_ListenHandle *lh = cls;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listener broke down, re-connecting\n");
-  GNUNET_CLIENT_disconnect (lh->client);
-  lh->client = NULL;
-  GNUNET_MQ_destroy (lh->mq);
-  lh->mq = NULL;
+  struct GNUNET_SET_Handle *set = oh->set;
+  struct GNUNET_SET_CancelMessage *m;
+  struct GNUNET_MQ_Envelope *mqm;
 
-  GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, listen_connect, lh);
-  lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
+  if (NULL != set)
+  {
+    mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
+    m->request_id = htonl (oh->request_id);
+    GNUNET_MQ_send (set->mq, mqm);
+  }
+  set_operation_destroy (oh);
+  if ( (NULL != set) &&
+       (GNUNET_YES == set->destroy_requested) &&
+       (NULL == set->ops_head) )
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Destroying set after operation cancel\n");
+    GNUNET_SET_destroy (set);
+  }
 }
 
 
+/**
+ * We encountered an error communicating with the set service while
+ * performing a set operation. Report to the application.
+ *
+ * @param cls the `struct GNUNET_SET_Handle`
+ * @param error error code
+ */
 static void
-handle_client_set_error (void *cls, enum GNUNET_MQ_Error error)
+handle_client_set_error (void *cls,
+                         enum GNUNET_MQ_Error error)
 {
   struct GNUNET_SET_Handle *set = cls;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handling client set error\n");
   while (NULL != set->ops_head)
   {
     if (NULL != set->ops_head->result_cb)
-      set->ops_head->result_cb (set->ops_head->result_cls, NULL,
+      set->ops_head->result_cb (set->ops_head->result_cls,
+                                NULL,
                                 GNUNET_SET_STATUS_FAILURE);
-    GNUNET_SET_operation_cancel (set->ops_head);
+    set_operation_destroy (set->ops_head);
   }
-
   set->invalid = GNUNET_YES;
+  if (GNUNET_YES == set->destroy_requested)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Destroying set after operation failure\n");
+    GNUNET_SET_destroy (set);
+  }
 }
 
 
@@ -394,35 +451,46 @@ struct GNUNET_SET_Handle *
 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
                    enum GNUNET_SET_OperationType op)
 {
-  struct GNUNET_SET_Handle *set;
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_CreateMessage *msg;
   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
-    {handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0},
-    {handle_iter_done, GNUNET_MESSAGE_TYPE_SET_ITER_DONE, 0},
+    { &handle_result,
+      GNUNET_MESSAGE_TYPE_SET_RESULT,
+      0 },
+    { &handle_iter_element,
+      GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
+      0 },
+    { &handle_iter_done,
+      GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
+      sizeof (struct GNUNET_MessageHeader) },
     GNUNET_MQ_HANDLERS_END
   };
+  struct GNUNET_SET_Handle *set;
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SET_CreateMessage *msg;
 
   set = GNUNET_new (struct GNUNET_SET_Handle);
   set->client = GNUNET_CLIENT_connect ("set", cfg);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n");
-  GNUNET_assert (NULL != set->client);
-  set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers,
-                                                   handle_client_set_error, set);
+  if (NULL == set->client)
+  {
+    GNUNET_free (set);
+    return NULL;
+  }
+  set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
+                                                   mq_handlers,
+                                                   &handle_client_set_error, set);
   GNUNET_assert (NULL != set->mq);
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
-  msg->operation = htons (op);
+  mqm = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_SET_CREATE);
+  msg->operation = htonl (op);
   GNUNET_MQ_send (set->mq, mqm);
   return set;
 }
 
 
 /**
- * Add an element to the given set.
- * After the element has been added (in the sense of being
- * transmitted to the set service), cont will be called.
- * Calls to add_element can be queued
+ * Add an element to the given set.  After the element has been added
+ * (in the sense of being transmitted to the set service), @a cont
+ * will be called.  Multiple calls to GNUNET_SET_add_element() can be
+ * queued.
  *
  * @param set set to add element to
  * @param element element to add to the set
@@ -446,27 +514,30 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
       cont (cont_cls);
     return GNUNET_SYSERR;
   }
-
-  mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
-  msg->element_type = element->type;
-  memcpy (&msg[1], element->data, element->size);
-  GNUNET_MQ_notify_sent (mqm, cont, cont_cls);
+  mqm = GNUNET_MQ_msg_extra (msg, element->size,
+                             GNUNET_MESSAGE_TYPE_SET_ADD);
+  msg->element_type = element->element_type;
+  memcpy (&msg[1],
+          element->data,
+          element->size);
+  GNUNET_MQ_notify_sent (mqm,
+                         cont, cont_cls);
   GNUNET_MQ_send (set->mq, mqm);
   return GNUNET_OK;
 }
 
 
 /**
- * Remove an element to the given set.
- * After the element has been removed (in the sense of the
- * request being transmitted to the set service), cont will be called.
- * Calls to remove_element can be queued
+ * Remove an element to the given set.  After the element has been
+ * removed (in the sense of the request being transmitted to the set
+ * service), @a cont will be called.  Multiple calls to
+ * GNUNET_SET_remove_element() can be queued
  *
  * @param set set to remove element from
  * @param element element to remove from the set
  * @param cont continuation called after the element has been removed
- * @param cont_cls closure for cont
- * @return GNUNET_OK on success, GNUNET_SYSERR if the
+ * @param cont_cls closure for @a cont
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
  *         set is invalid (e.g. the set service crashed)
  */
 int
@@ -484,31 +555,52 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
       cont (cont_cls);
     return GNUNET_SYSERR;
   }
-
-  mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE);
-  msg->element_type = element->type;
-  memcpy (&msg[1], element->data, element->size);
-  GNUNET_MQ_notify_sent (mqm, cont, cont_cls);
+  mqm = GNUNET_MQ_msg_extra (msg,
+                             element->size,
+                             GNUNET_MESSAGE_TYPE_SET_REMOVE);
+  msg->element_type = element->element_type;
+  memcpy (&msg[1],
+          element->data,
+          element->size);
+  GNUNET_MQ_notify_sent (mqm,
+                         cont, cont_cls);
   GNUNET_MQ_send (set->mq, mqm);
   return GNUNET_OK;
 }
 
 
 /**
- * Destroy the set handle, and free all associated resources.
+ * Destroy the set handle if no operations are left, mark the set
+ * for destruction otherwise.
+ *
+ * @param set set handle to destroy
  */
 void
 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
 {
+  /* destroying set while iterator is active is currently
+     not supported; we should expand the API to allow
+     clients to explicitly cancel the iteration! */
+  GNUNET_assert (NULL == set->iterator);
   if (NULL != set->ops_head)
   {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Set operations are pending, delaying set destruction\n");
     set->destroy_requested = GNUNET_YES;
     return;
   }
-  GNUNET_CLIENT_disconnect (set->client);
-  set->client = NULL;
-  GNUNET_MQ_destroy (set->mq);
-  set->mq = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Really destroying set\n");
+  if (NULL != set->client)
+  {
+    GNUNET_CLIENT_disconnect (set->client);
+    set->client = NULL;
+  }
+  if (NULL != set->mq)
+  {
+    GNUNET_MQ_destroy (set->mq);
+    set->mq = NULL;
+  }
   GNUNET_free (set);
 }
 
@@ -516,25 +608,21 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
 /**
  * Prepare a set operation to be evaluated with another peer.
  * The evaluation will not start until the client provides
- * a local set with GNUNET_SET_commit.
+ * a local set with #GNUNET_SET_commit().
  *
  * @param other_peer peer with the other set
  * @param app_id hash for the application using the set
  * @param context_msg additional information for the request
- * @param salt salt used for the set operation; sometimes set operations
- *        fail due to hash collisions, using a different salt for each operation
- *        makes it harder for an attacker to exploit this
  * @param result_mode specified how results will be returned,
- *        see 'GNUNET_SET_ResultMode'.
+ *        see `enum GNUNET_SET_ResultMode`.
  * @param result_cb called on error or success
- * @param result_cls closure for result_cb
+ * @param result_cls closure for @e result_cb
  * @return a handle to cancel the operation
  */
 struct GNUNET_SET_OperationHandle *
 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
                     const struct GNUNET_HashCode *app_id,
                     const struct GNUNET_MessageHeader *context_msg,
-                    uint16_t salt,
                     enum GNUNET_SET_ResultMode result_mode,
                     GNUNET_SET_ResultIterator result_cb,
                     void *result_cls)
@@ -546,13 +634,12 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
   oh->result_cb = result_cb;
   oh->result_cls = result_cls;
-
-  mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg);
-
+  mqm = GNUNET_MQ_msg_nested_mh (msg,
+                                 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
+                                 context_msg);
   msg->app_id = *app_id;
+  msg->result_mode = htonl (result_mode);
   msg->target_peer = *other_peer;
-  msg->salt = salt;
-  msg->reserved = 0;
   oh->conclude_mqm = mqm;
   oh->request_id_addr = &msg->request_id;
 
@@ -561,35 +648,131 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
 
 
 /**
- * Connect to the set service in order to listen
- * for request.
+ * Connect to the set service in order to listen for requests.
  *
- * @param cls the listen handle to connect
+ * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
  * @param tc task context if invoked as a task, NULL otherwise
  */
 static void
 listen_connect (void *cls,
-                const struct GNUNET_SCHEDULER_TaskContext *tc)
+                const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Handle request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param mh the message
+ */
+static void
+handle_request (void *cls,
+                const struct GNUNET_MessageHeader *mh)
 {
+  struct GNUNET_SET_ListenHandle *lh = cls;
+  const struct GNUNET_SET_RequestMessage *msg;
+  struct GNUNET_SET_Request req;
+  const struct GNUNET_MessageHeader *context_msg;
+  uint16_t msize;
   struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_ListenMessage *msg;
+  struct GNUNET_SET_RejectMessage *rmsg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Processing incoming operation request\n");
+  msize = ntohs (mh->size);
+  if (msize < sizeof (struct GNUNET_SET_RequestMessage))
+  {
+    GNUNET_break (0);
+    GNUNET_CLIENT_disconnect (lh->client);
+    lh->client = NULL;
+    GNUNET_MQ_destroy (lh->mq);
+    lh->mq = NULL;
+    lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
+                                                       &listen_connect, lh);
+    lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
+    return;
+  }
+  /* we got another valid request => reset the backoff */
+  lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  msg = (const struct GNUNET_SET_RequestMessage *) mh;
+  req.accept_id = ntohl (msg->accept_id);
+  req.accepted = GNUNET_NO;
+  context_msg = GNUNET_MQ_extract_nested_mh (msg);
+  /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
+  lh->listen_cb (lh->listen_cls,
+                 &msg->peer_id,
+                 context_msg,
+                 &req);
+  if (GNUNET_YES == req.accepted)
+    return; /* the accept-case is handled in #GNUNET_SET_accept() */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Rejecting request\n");
+  mqm = GNUNET_MQ_msg (rmsg,
+                       GNUNET_MESSAGE_TYPE_SET_REJECT);
+  rmsg->accept_reject_id = msg->accept_id;
+  GNUNET_MQ_send (lh->mq, mqm);
+}
+
+
+/**
+ * Our connection with the set service encountered an error,
+ * re-initialize with exponential back-off.
+ *
+ * @param cls the `struct GNUNET_SET_ListenHandle *`
+ * @param error reason for the disconnect
+ */
+static void
+handle_client_listener_error (void *cls,
+                              enum GNUNET_MQ_Error error)
+{
   struct GNUNET_SET_ListenHandle *lh = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Listener broke down (%d), re-connecting\n",
+       (int) error);
+  GNUNET_CLIENT_disconnect (lh->client);
+  lh->client = NULL;
+  GNUNET_MQ_destroy (lh->mq);
+  lh->mq = NULL;
+  lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
+                                                     &listen_connect, lh);
+  lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
+}
+
+
+/**
+ * Connect to the set service in order to listen for requests.
+ *
+ * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
+ * @param tc task context if invoked as a task, NULL otherwise
+ */
+static void
+listen_connect (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
+    { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
     GNUNET_MQ_HANDLERS_END
   };
+  struct GNUNET_SET_ListenHandle *lh = cls;
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SET_ListenMessage *msg;
 
+  if ( (NULL != tc) &&
+       (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Listener not reconnecting due to shutdown\n");
+    return;
+  }
+  lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (NULL == lh->client);
   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
   if (NULL == lh->client)
-  {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "could not connect to set (wrong configuration?), giving up listening\n");
     return;
-  }
   GNUNET_assert (NULL == lh->mq);
-  lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
-                                                  handle_client_listener_error, lh);
+  lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
+                                                  mq_handlers,
+                                                  &handle_client_listener_error, lh);
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
   msg->operation = htonl (lh->operation);
   msg->app_id = lh->app_id;
@@ -599,14 +782,14 @@ listen_connect (void *cls,
 
 /**
  * Wait for set operation requests for the given application id
- * 
+ *
  * @param cfg configuration to use for connecting to
  *            the set service, needs to be valid for the lifetime of the listen handle
  * @param operation operation we want to listen for
  * @param app_id id of the application that handles set operation requests
  * @param listen_cb called for each incoming request matching the operation
  *                  and application id
- * @param listen_cls handle for listen_cb
+ * @param listen_cls handle for @a listen_cb
  * @return a handle that can be used to cancel the listen operation
  */
 struct GNUNET_SET_ListenHandle *
@@ -626,6 +809,11 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
   lh->app_id = *app_id;
   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
   listen_connect (lh, NULL);
+  if (NULL == lh->client)
+  {
+    GNUNET_free (lh);
+    return NULL;
+  }
   return lh;
 }
 
@@ -638,9 +826,23 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
 void
 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "canceling listener\n");
-  GNUNET_MQ_destroy (lh->mq);
-  GNUNET_CLIENT_disconnect (lh->client);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Canceling listener\n");
+  if (NULL != lh->mq)
+  {
+    GNUNET_MQ_destroy (lh->mq);
+    lh->mq = NULL;
+  }
+  if (NULL != lh->client)
+  {
+    GNUNET_CLIENT_disconnect (lh->client);
+    lh->client = NULL;
+  }
+  if (GNUNET_SCHEDULER_NO_TASK != lh->reconnect_task)
+  {
+    GNUNET_SCHEDULER_cancel (lh->reconnect_task);
+    lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  }
   GNUNET_free (lh);
 }
 
@@ -649,14 +851,14 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
  * afterwards.
- * Call #GNUNET_SET_conclude to provide the local set to use for the operation,
- * and to begin the exchange with the remote peer. 
+ * Call #GNUNET_SET_commit to provide the local set to use for the operation,
+ * and to begin the exchange with the remote peer.
  *
  * @param request request to accept
  * @param result_mode specified how results will be returned,
- *        see 'GNUNET_SET_ResultMode'.
+ *        see `enum GNUNET_SET_ResultMode`.
  * @param result_cb callback for the results
- * @param result_cls closure for result_cb
+ * @param result_cls closure for @a result_cb
  * @return a handle to cancel the operation
  */
 struct GNUNET_SET_OperationHandle *
@@ -667,60 +869,22 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
 {
   struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_OperationHandle *oh;
-  struct GNUNET_SET_AcceptRejectMessage *msg;
+  struct GNUNET_SET_AcceptMessage *msg;
 
-  GNUNET_assert (NULL != request);
   GNUNET_assert (GNUNET_NO == request->accepted);
   request->accepted = GNUNET_YES;
-
+  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+  msg->accept_reject_id = htonl (request->accept_id);
+  msg->result_mode = htonl (result_mode);
   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
   oh->result_cb = result_cb;
   oh->result_cls = result_cls;
-
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
-  msg->accept_reject_id = htonl (request->accept_id);
-
   oh->conclude_mqm = mqm;
   oh->request_id_addr = &msg->request_id;
-
   return oh;
 }
 
 
-/**
- * Cancel the given set operation.
- * We need to send an explicit cancel message, as
- * all operations communicate with the set's client
- * handle.
- *
- * @param oh set operation to cancel
- */
-void
-GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
-{
-  if (NULL != oh->conclude_mqm)
-    GNUNET_MQ_discard (oh->conclude_mqm);
-
-  /* is the operation already commited? */
-  if (NULL != oh->set)
-  {
-    struct GNUNET_SET_OperationHandle *h_assoc;
-    struct GNUNET_MQ_Envelope *mqm;
-
-    GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
-    h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
-    GNUNET_assert ((h_assoc == NULL) || (h_assoc == oh));
-    mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
-    GNUNET_MQ_send (oh->set->mq, mqm);
-
-    if (GNUNET_YES == oh->set->destroy_requested)
-      GNUNET_SET_destroy (oh->set);
-  }
-
-  GNUNET_free (oh);
-}
-
-
 /**
  * Commit a set to be used with a set operation.
  * This function is called once we have fully constructed
@@ -729,7 +893,7 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
  * set information and call the result callback with the
  * result information.
  *
- * @param oh handle to the set operation 
+ * @param oh handle to the set operation
  * @param set the set to use for the operation
  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
  *         set is invalid (e.g. the set service crashed)
@@ -743,7 +907,9 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
     return GNUNET_SYSERR;
   GNUNET_assert (NULL != oh->conclude_mqm);
   oh->set = set;
-  GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, oh);
+  GNUNET_CONTAINER_DLL_insert (set->ops_head,
+                               set->ops_tail,
+                               oh);
   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
   *oh->request_id_addr = htonl (oh->request_id);
   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
@@ -754,34 +920,53 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
 
 
 /**
- * Iterate over all elements in the given set.
- * Note that this operation involves transferring every element of the set
- * from the service to the client, and is thus costly.
+ * Iterate over all elements in the given set.  Note that this
+ * operation involves transferring every element of the set from the
+ * service to the client, and is thus costly.
  *
  * @param set the set to iterate over
  * @param iter the iterator to call for each element
- * @param cls closure for @a iter
+ * @param iter_cls closure for @a iter
  * @return #GNUNET_YES if the iteration started successfuly,
  *         #GNUNET_NO if another iteration is active
  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
  */
 int
-GNUNET_SET_iterate (struct GNUNET_SET_Handle *set, GNUNET_SET_ElementIterator iter, void *cls)
+GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
+                    GNUNET_SET_ElementIterator iter,
+                    void *iter_cls)
 {
   struct GNUNET_MQ_Envelope *ev;
 
   GNUNET_assert (NULL != iter);
-
   if (GNUNET_YES == set->invalid)
     return GNUNET_SYSERR;
   if (NULL != set->iterator)
     return GNUNET_NO;
-
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Iterating over set\n");
   set->iterator = iter;
-  set->iterator_cls = cls;
+  set->iterator_cls = iter_cls;
   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
   GNUNET_MQ_send (set->mq, ev);
   return GNUNET_YES;
 }
 
+
+/**
+ * Stop iteration over all elements in the given set.  Can only
+ * be called before the iteration has "naturally" completed its
+ * turn.
+ *
+ * @param set the set to stop iterating over
+ */
+void
+GNUNET_SET_iterate_cancel (struct GNUNET_SET_Handle *set)
+{
+  GNUNET_assert (NULL != set->iterator);
+  set->iterator = NULL;
+  set->iteration_id++;
+}
+
+
 /* end of set_api.c */