-comments: the world ain't all male
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file set/set_api.c
20  * @brief api for the set service
21  * @author Florian Dold
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_protocols.h"
27 #include "gnunet_set_service.h"
28 #include "set.h"
29
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
32
33 struct SetCopyRequest
34 {
35   struct SetCopyRequest *next;
36
37   struct SetCopyRequest *prev;
38
39   void *cls;
40
41   GNUNET_SET_CopyReadyCallback cb;
42 };
43
44 /**
45  * Opaque handle to a set.
46  */
47 struct GNUNET_SET_Handle
48 {
49   /**
50    * Message queue for @e client.
51    */
52   struct GNUNET_MQ_Handle *mq;
53
54   /**
55    * Linked list of operations on the set.
56    */
57   struct GNUNET_SET_OperationHandle *ops_head;
58
59   /**
60    * Linked list of operations on the set.
61    */
62   struct GNUNET_SET_OperationHandle *ops_tail;
63
64   /**
65    * Callback for the current iteration over the set,
66    * NULL if no iterator is active.
67    */
68   GNUNET_SET_ElementIterator iterator;
69
70   /**
71    * Closure for @e iterator
72    */
73   void *iterator_cls;
74
75   /**
76    * Should the set be destroyed once all operations are gone?
77    * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
78    * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
79    */
80   int destroy_requested;
81
82   /**
83    * Has the set become invalid (e.g. service died)?
84    */
85   int invalid;
86
87   /**
88    * Both client and service count the number of iterators
89    * created so far to match replies with iterators.
90    */
91   uint16_t iteration_id;
92
93   /**
94    * Configuration, needed when creating (lazy) copies.
95    */
96   const struct GNUNET_CONFIGURATION_Handle *cfg;
97
98   /**
99    * Doubly linked list of copy requests.
100    */
101   struct SetCopyRequest *copy_req_head;
102
103   /**
104    * Doubly linked list of copy requests.
105    */
106   struct SetCopyRequest *copy_req_tail;
107 };
108
109
110 /**
111  * Handle for a set operation request from another peer.
112  */
113 struct GNUNET_SET_Request
114 {
115   /**
116    * Id of the request, used to identify the request when
117    * accepting/rejecting it.
118    */
119   uint32_t accept_id;
120
121   /**
122    * Has the request been accepted already?
123    * #GNUNET_YES/#GNUNET_NO
124    */
125   int accepted;
126 };
127
128
129 /**
130  * Handle to an operation.  Only known to the service after committing
131  * the handle with a set.
132  */
133 struct GNUNET_SET_OperationHandle
134 {
135   /**
136    * Function to be called when we have a result,
137    * or an error.
138    */
139   GNUNET_SET_ResultIterator result_cb;
140
141   /**
142    * Closure for @e result_cb.
143    */
144   void *result_cls;
145
146   /**
147    * Local set used for the operation,
148    * NULL if no set has been provided by conclude yet.
149    */
150   struct GNUNET_SET_Handle *set;
151
152   /**
153    * Message sent to the server on calling conclude,
154    * NULL if conclude has been called.
155    */
156   struct GNUNET_MQ_Envelope *conclude_mqm;
157
158   /**
159    * Address of the request if in the conclude message,
160    * used to patch the request id into the message when the set is known.
161    */
162   uint32_t *request_id_addr;
163
164   /**
165    * Handles are kept in a linked list.
166    */
167   struct GNUNET_SET_OperationHandle *prev;
168
169   /**
170    * Handles are kept in a linked list.
171    */
172   struct GNUNET_SET_OperationHandle *next;
173
174   /**
175    * Request ID to identify the operation within the set.
176    */
177   uint32_t request_id;
178 };
179
180
181 /**
182  * Opaque handle to a listen operation.
183  */
184 struct GNUNET_SET_ListenHandle
185 {
186
187   /**
188    * Message queue for the client.
189    */
190   struct GNUNET_MQ_Handle* mq;
191
192   /**
193    * Configuration handle for the listener, stored
194    * here to be able to reconnect transparently on
195    * connection failure.
196    */
197   const struct GNUNET_CONFIGURATION_Handle *cfg;
198
199   /**
200    * Function to call on a new incoming request,
201    * or on error.
202    */
203   GNUNET_SET_ListenCallback listen_cb;
204
205   /**
206    * Closure for @e listen_cb.
207    */
208   void *listen_cls;
209
210   /**
211    * Application ID we listen for.
212    */
213   struct GNUNET_HashCode app_id;
214
215   /**
216    * Time to wait until we try to reconnect on failure.
217    */
218   struct GNUNET_TIME_Relative reconnect_backoff;
219
220   /**
221    * Task for reconnecting when the listener fails.
222    */
223   struct GNUNET_SCHEDULER_Task *reconnect_task;
224
225   /**
226    * Operation we listen for.
227    */
228   enum GNUNET_SET_OperationType operation;
229 };
230
231
232 /* mutual recursion with handle_copy_lazy */
233 static struct GNUNET_SET_Handle *
234 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
235                  enum GNUNET_SET_OperationType op,
236                  const uint32_t *cookie);
237
238
239 /**
240  * Handle element for iteration over the set.  Notifies the
241  * iterator and sends an acknowledgement to the service.
242  *
243  * @param cls the `struct GNUNET_SET_Handle *`
244  * @param msg the message
245  */
246 static void
247 handle_copy_lazy (void *cls,
248                   const struct GNUNET_SET_CopyLazyResponseMessage *msg)
249 {
250   struct GNUNET_SET_Handle *set = cls;
251   struct SetCopyRequest *req;
252   struct GNUNET_SET_Handle *new_set;
253
254   req = set->copy_req_head;
255   if (NULL == req)
256   {
257     /* Service sent us unsolicited lazy copy response */
258     GNUNET_break (0);
259     return;
260   }
261
262   LOG (GNUNET_ERROR_TYPE_DEBUG,
263        "Handling response to lazy copy\n");
264   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
265                                set->copy_req_tail,
266                                req);
267   // We pass none as operation here, since it doesn't matter when
268   // cloning.
269   new_set = create_internal (set->cfg,
270                              GNUNET_SET_OPERATION_NONE,
271                              &msg->cookie);
272   req->cb (req->cls, new_set);
273   GNUNET_free (req);
274 }
275
276
277 /**
278  * Check that the given @a msg is well-formed.
279  *
280  * @param cls closure
281  * @param msg message to check
282  * @return #GNUNET_OK if message is well-formed
283  */
284 static int
285 check_iter_element (void *cls,
286                     const struct GNUNET_SET_IterResponseMessage *msg)
287 {
288   /* minimum size was already checked, everything else is OK! */
289   return GNUNET_OK;
290 }
291
292
293 /**
294  * Handle element for iteration over the set.  Notifies the
295  * iterator and sends an acknowledgement to the service.
296  *
297  * @param cls the `struct GNUNET_SET_Handle *`
298  * @param mh the message
299  */
300 static void
301 handle_iter_element (void *cls,
302                      const struct GNUNET_SET_IterResponseMessage *msg)
303 {
304   struct GNUNET_SET_Handle *set = cls;
305   GNUNET_SET_ElementIterator iter = set->iterator;
306   struct GNUNET_SET_Element element;
307   struct GNUNET_SET_IterAckMessage *ack_msg;
308   struct GNUNET_MQ_Envelope *ev;
309   uint16_t msize;
310
311   LOG (GNUNET_ERROR_TYPE_DEBUG,
312        "Received element in set iteration\n");
313   msize = ntohs (msg->header.size);
314   if (set->iteration_id != ntohs (msg->iteration_id))
315   {
316     /* element from a previous iteration, skip! */
317     iter = NULL;
318   }
319   if (NULL != iter)
320   {
321     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
322     element.element_type = ntohs (msg->element_type);
323     element.data = &msg[1];
324     iter (set->iterator_cls,
325           &element);
326   }
327   ev = GNUNET_MQ_msg (ack_msg,
328                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
329   ack_msg->send_more = htonl ((NULL != iter));
330   GNUNET_MQ_send (set->mq, ev);
331 }
332
333
334 /**
335  * Handle message signalling conclusion of iteration over the set.
336  * Notifies the iterator that we are done.
337  *
338  * @param cls the set
339  * @param mh the message
340  */
341 static void
342 handle_iter_done (void *cls,
343                   const struct GNUNET_MessageHeader *mh)
344 {
345   struct GNUNET_SET_Handle *set = cls;
346   GNUNET_SET_ElementIterator iter = set->iterator;
347
348   if (NULL == iter)
349   {
350     /* FIXME: if this is true, could cancel+start a fresh one
351        cause elements to go to the wrong iteration? */
352     LOG (GNUNET_ERROR_TYPE_INFO,
353          "Service completed set iteration that was already cancelled\n");
354     return;
355   }
356   LOG (GNUNET_ERROR_TYPE_DEBUG,
357        "Set iteration completed\n");
358   set->destroy_requested = GNUNET_SYSERR;
359   set->iterator = NULL;
360   set->iteration_id++;
361   iter (set->iterator_cls,
362         NULL);
363   if (GNUNET_SYSERR == set->destroy_requested)
364     set->destroy_requested = GNUNET_NO;
365   if (GNUNET_YES == set->destroy_requested)
366     GNUNET_SET_destroy (set);
367 }
368
369
370 /**
371  * Check that the given @a msg is well-formed.
372  *
373  * @param cls closure
374  * @param msg message to check
375  * @return #GNUNET_OK if message is well-formed
376  */
377 static int
378 check_result (void *cls,
379               const struct GNUNET_SET_ResultMessage *msg)
380 {
381   /* minimum size was already checked, everything else is OK! */
382   return GNUNET_OK;
383 }
384
385
386 /**
387  * Handle result message for a set operation.
388  *
389  * @param cls the set
390  * @param mh the message
391  */
392 static void
393 handle_result (void *cls,
394                const struct GNUNET_SET_ResultMessage *msg)
395 {
396   struct GNUNET_SET_Handle *set = cls;
397   struct GNUNET_SET_OperationHandle *oh;
398   struct GNUNET_SET_Element e;
399   enum GNUNET_SET_Status result_status;
400   int destroy_set;
401
402   GNUNET_assert (NULL != set->mq);
403   result_status = (enum GNUNET_SET_Status) ntohs (msg->result_status);
404   LOG (GNUNET_ERROR_TYPE_DEBUG,
405        "Got result message with status %d\n",
406        result_status);
407
408   oh = GNUNET_MQ_assoc_get (set->mq,
409                             ntohl (msg->request_id));
410   if (NULL == oh)
411   {
412     /* 'oh' can be NULL if we canceled the operation, but the service
413        did not get the cancel message yet. */
414     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
415                 "Ignoring result from canceled operation\n");
416     return;
417   }
418
419   switch (result_status)
420   {
421     case GNUNET_SET_STATUS_OK:
422     case GNUNET_SET_STATUS_ADD_LOCAL:
423     case GNUNET_SET_STATUS_ADD_REMOTE:
424       goto do_element;
425     case GNUNET_SET_STATUS_FAILURE:
426     case GNUNET_SET_STATUS_DONE:
427       goto do_final;
428     case GNUNET_SET_STATUS_HALF_DONE:
429       /* not used anymore */
430       GNUNET_assert (0);
431   }
432
433 do_final:
434   LOG (GNUNET_ERROR_TYPE_DEBUG,
435        "Treating result as final status\n");
436   GNUNET_MQ_assoc_remove (set->mq,
437                           ntohl (msg->request_id));
438   GNUNET_CONTAINER_DLL_remove (set->ops_head,
439                                set->ops_tail,
440                                oh);
441   /* Need to do this calculation _before_ the result callback,
442      as IF the application still has a valid set handle, it
443      may trigger destruction of the set during the callback. */
444   destroy_set = (GNUNET_YES == set->destroy_requested) &&
445                 (NULL == set->ops_head);
446   if (NULL != oh->result_cb)
447   {
448     oh->result_cb (oh->result_cls,
449                    NULL,
450                    GNUNET_ntohll (msg->current_size),
451                    result_status);
452   }
453   else
454   {
455     LOG (GNUNET_ERROR_TYPE_DEBUG,
456          "No callback for final status\n");
457   }
458   if (destroy_set)
459     GNUNET_SET_destroy (set);
460   GNUNET_free (oh);
461   return;
462
463 do_element:
464   LOG (GNUNET_ERROR_TYPE_DEBUG,
465        "Treating result as element\n");
466   e.data = &msg[1];
467   e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
468   e.element_type = ntohs (msg->element_type);
469   if (NULL != oh->result_cb)
470     oh->result_cb (oh->result_cls,
471                    &e,
472                    GNUNET_ntohll (msg->current_size),
473                    result_status);
474 }
475
476
477 /**
478  * Destroy the given set operation.
479  *
480  * @param oh set operation to destroy
481  */
482 static void
483 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
484 {
485   struct GNUNET_SET_Handle *set = oh->set;
486   struct GNUNET_SET_OperationHandle *h_assoc;
487
488   if (NULL != oh->conclude_mqm)
489     GNUNET_MQ_discard (oh->conclude_mqm);
490   /* is the operation already commited? */
491   if (NULL != set)
492   {
493     GNUNET_CONTAINER_DLL_remove (set->ops_head,
494                                  set->ops_tail,
495                                  oh);
496     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
497                                       oh->request_id);
498     GNUNET_assert ( (NULL == h_assoc) ||
499                     (h_assoc == oh) );
500   }
501   GNUNET_free (oh);
502 }
503
504
505 /**
506  * Cancel the given set operation.  We need to send an explicit cancel
507  * message, as all operations one one set communicate using one
508  * handle.
509  *
510  * @param oh set operation to cancel
511  */
512 void
513 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
514 {
515   struct GNUNET_SET_Handle *set = oh->set;
516   struct GNUNET_SET_CancelMessage *m;
517   struct GNUNET_MQ_Envelope *mqm;
518
519   LOG (GNUNET_ERROR_TYPE_DEBUG,
520        "Cancelling SET operation\n");
521   if (NULL != set)
522   {
523     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
524     m->request_id = htonl (oh->request_id);
525     GNUNET_MQ_send (set->mq, mqm);
526   }
527   set_operation_destroy (oh);
528   if ( (NULL != set) &&
529        (GNUNET_YES == set->destroy_requested) &&
530        (NULL == set->ops_head) )
531   {
532     LOG (GNUNET_ERROR_TYPE_DEBUG,
533          "Destroying set after operation cancel\n");
534     GNUNET_SET_destroy (set);
535   }
536 }
537
538
539 /**
540  * We encountered an error communicating with the set service while
541  * performing a set operation. Report to the application.
542  *
543  * @param cls the `struct GNUNET_SET_Handle`
544  * @param error error code
545  */
546 static void
547 handle_client_set_error (void *cls,
548                          enum GNUNET_MQ_Error error)
549 {
550   struct GNUNET_SET_Handle *set = cls;
551   GNUNET_SET_ElementIterator iter = set->iterator;
552
553   LOG (GNUNET_ERROR_TYPE_ERROR,
554        "Handling client set error %d\n",
555        error);
556   while (NULL != set->ops_head)
557   {
558     if ( (NULL != set->ops_head->result_cb) &&
559          (GNUNET_NO == set->destroy_requested) )
560       set->ops_head->result_cb (set->ops_head->result_cls,
561                                 NULL,
562                                 0,
563                                 GNUNET_SET_STATUS_FAILURE);
564     set_operation_destroy (set->ops_head);
565   }
566   set->iterator = NULL;
567   set->iteration_id++;
568   set->invalid = GNUNET_YES;
569   if (NULL != iter)
570     iter (set->iterator_cls,
571           NULL);
572 }
573
574
575 /**
576  * FIXME.
577  */
578 static struct GNUNET_SET_Handle *
579 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
580                  enum GNUNET_SET_OperationType op,
581                  const uint32_t *cookie)
582 {
583   struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
584   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
585     GNUNET_MQ_hd_var_size (result,
586                            GNUNET_MESSAGE_TYPE_SET_RESULT,
587                            struct GNUNET_SET_ResultMessage,
588                            set),
589     GNUNET_MQ_hd_var_size (iter_element,
590                            GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
591                            struct GNUNET_SET_IterResponseMessage,
592                            set),
593     GNUNET_MQ_hd_fixed_size (iter_done,
594                              GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
595                              struct GNUNET_MessageHeader,
596                              set),
597     GNUNET_MQ_hd_fixed_size (copy_lazy,
598                              GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
599                              struct GNUNET_SET_CopyLazyResponseMessage,
600                              set),
601     GNUNET_MQ_handler_end ()
602   };
603   struct GNUNET_MQ_Envelope *mqm;
604   struct GNUNET_SET_CreateMessage *create_msg;
605   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
606
607   set->cfg = cfg;
608   set->mq = GNUNET_CLIENT_connect (cfg,
609                                    "set",
610                                    mq_handlers,
611                                    &handle_client_set_error,
612                                    set);
613   if (NULL == set->mq)
614   {
615     GNUNET_free (set);
616     return NULL;
617   }
618   if (NULL == cookie)
619   {
620     LOG (GNUNET_ERROR_TYPE_DEBUG,
621          "Creating new set (operation %u)\n",
622          op);
623     mqm = GNUNET_MQ_msg (create_msg,
624                          GNUNET_MESSAGE_TYPE_SET_CREATE);
625     create_msg->operation = htonl (op);
626   }
627   else
628   {
629     LOG (GNUNET_ERROR_TYPE_DEBUG,
630          "Creating new set (lazy copy)\n",
631          op);
632     mqm = GNUNET_MQ_msg (copy_msg,
633                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
634     copy_msg->cookie = *cookie;
635   }
636   GNUNET_MQ_send (set->mq,
637                   mqm);
638   return set;
639 }
640
641
642 /**
643  * Create an empty set, supporting the specified operation.
644  *
645  * @param cfg configuration to use for connecting to the
646  *        set service
647  * @param op operation supported by the set
648  *        Note that the operation has to be specified
649  *        beforehand, as certain set operations need to maintain
650  *        data structures spefific to the operation
651  * @return a handle to the set
652  */
653 struct GNUNET_SET_Handle *
654 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
655                    enum GNUNET_SET_OperationType op)
656 {
657   struct GNUNET_SET_Handle *set;
658
659   set = create_internal (cfg,
660                           op,
661                           NULL);
662   LOG (GNUNET_ERROR_TYPE_DEBUG,
663        "Creating set %p for operation %d\n",
664        set,
665        op);
666   return set;
667 }
668
669
670 /**
671  * Add an element to the given set.  After the element has been added
672  * (in the sense of being transmitted to the set service), @a cont
673  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
674  * queued.
675  *
676  * @param set set to add element to
677  * @param element element to add to the set
678  * @param cont continuation called after the element has been added
679  * @param cont_cls closure for @a cont
680  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
681  *         set is invalid (e.g. the set service crashed)
682  */
683 int
684 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
685                         const struct GNUNET_SET_Element *element,
686                         GNUNET_SET_Continuation cont,
687                         void *cont_cls)
688 {
689   struct GNUNET_MQ_Envelope *mqm;
690   struct GNUNET_SET_ElementMessage *msg;
691
692   LOG (GNUNET_ERROR_TYPE_DEBUG,
693        "adding element of type %u to set %p\n",
694        (unsigned int) element->element_type,
695        set);
696   GNUNET_assert (NULL != set);
697   if (GNUNET_YES == set->invalid)
698   {
699     if (NULL != cont)
700       cont (cont_cls);
701     return GNUNET_SYSERR;
702   }
703   mqm = GNUNET_MQ_msg_extra (msg,
704                              element->size,
705                              GNUNET_MESSAGE_TYPE_SET_ADD);
706   msg->element_type = htons (element->element_type);
707   GNUNET_memcpy (&msg[1],
708                  element->data,
709                  element->size);
710   GNUNET_MQ_notify_sent (mqm,
711                          cont, cont_cls);
712   GNUNET_MQ_send (set->mq, mqm);
713   return GNUNET_OK;
714 }
715
716
717 /**
718  * Remove an element to the given set.  After the element has been
719  * removed (in the sense of the request being transmitted to the set
720  * service), @a cont will be called.  Multiple calls to
721  * GNUNET_SET_remove_element() can be queued
722  *
723  * @param set set to remove element from
724  * @param element element to remove from the set
725  * @param cont continuation called after the element has been removed
726  * @param cont_cls closure for @a cont
727  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
728  *         set is invalid (e.g. the set service crashed)
729  */
730 int
731 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
732                            const struct GNUNET_SET_Element *element,
733                            GNUNET_SET_Continuation cont,
734                            void *cont_cls)
735 {
736   struct GNUNET_MQ_Envelope *mqm;
737   struct GNUNET_SET_ElementMessage *msg;
738
739   LOG (GNUNET_ERROR_TYPE_DEBUG,
740        "Removing element from set %p\n",
741        set);
742   if (GNUNET_YES == set->invalid)
743   {
744     if (NULL != cont)
745       cont (cont_cls);
746     return GNUNET_SYSERR;
747   }
748   mqm = GNUNET_MQ_msg_extra (msg,
749                              element->size,
750                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
751   msg->element_type = htons (element->element_type);
752   GNUNET_memcpy (&msg[1],
753                  element->data,
754                  element->size);
755   GNUNET_MQ_notify_sent (mqm,
756                          cont, cont_cls);
757   GNUNET_MQ_send (set->mq, mqm);
758   return GNUNET_OK;
759 }
760
761
762 /**
763  * Destroy the set handle if no operations are left, mark the set
764  * for destruction otherwise.
765  *
766  * @param set set handle to destroy
767  */
768 void
769 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
770 {
771   /* destroying set while iterator is active is currently
772      not supported; we should expand the API to allow
773      clients to explicitly cancel the iteration! */
774   GNUNET_assert (NULL != set);
775   if ( (NULL != set->ops_head) ||
776        (NULL != set->iterator) ||
777        (GNUNET_SYSERR == set->destroy_requested) )
778   {
779     LOG (GNUNET_ERROR_TYPE_DEBUG,
780          "Set operations are pending, delaying set destruction\n");
781     set->destroy_requested = GNUNET_YES;
782     return;
783   }
784   LOG (GNUNET_ERROR_TYPE_DEBUG,
785        "Really destroying set\n");
786   if (NULL != set->mq)
787   {
788     GNUNET_MQ_destroy (set->mq);
789     set->mq = NULL;
790   }
791   GNUNET_free (set);
792 }
793
794
795 /**
796  * Prepare a set operation to be evaluated with another peer.
797  * The evaluation will not start until the client provides
798  * a local set with #GNUNET_SET_commit().
799  *
800  * @param other_peer peer with the other set
801  * @param app_id hash for the application using the set
802  * @param context_msg additional information for the request
803  * @param result_mode specified how results will be returned,
804  *        see `enum GNUNET_SET_ResultMode`.
805  * @param result_cb called on error or success
806  * @param result_cls closure for @e result_cb
807  * @return a handle to cancel the operation
808  */
809 struct GNUNET_SET_OperationHandle *
810 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
811                     const struct GNUNET_HashCode *app_id,
812                     const struct GNUNET_MessageHeader *context_msg,
813                     enum GNUNET_SET_ResultMode result_mode,
814                     struct GNUNET_SET_Option options[],
815                     GNUNET_SET_ResultIterator result_cb,
816                     void *result_cls)
817 {
818   struct GNUNET_MQ_Envelope *mqm;
819   struct GNUNET_SET_OperationHandle *oh;
820   struct GNUNET_SET_EvaluateMessage *msg;
821   struct GNUNET_SET_Option *opt;
822
823   LOG (GNUNET_ERROR_TYPE_DEBUG,
824        "Client prepares set operation (%d)\n",
825        result_mode);
826   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
827   oh->result_cb = result_cb;
828   oh->result_cls = result_cls;
829   mqm = GNUNET_MQ_msg_nested_mh (msg,
830                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
831                                  context_msg);
832   msg->app_id = *app_id;
833   msg->result_mode = htonl (result_mode);
834   msg->target_peer = *other_peer;
835   for (opt = options; opt->type != 0; opt++)
836   {
837     switch (opt->type)
838     {
839       case GNUNET_SET_OPTION_BYZANTINE:
840         msg->byzantine = GNUNET_YES;
841         msg->byzantine_lower_bound = opt->v.num;
842         break;
843       case GNUNET_SET_OPTION_FORCE_FULL:
844         msg->force_full = GNUNET_YES;
845         break;
846       case GNUNET_SET_OPTION_FORCE_DELTA:
847         msg->force_delta = GNUNET_YES;
848         break;
849       default:
850         LOG (GNUNET_ERROR_TYPE_ERROR,
851              "Option with type %d not recognized\n", (int) opt->type);
852     }
853   }
854   oh->conclude_mqm = mqm;
855   oh->request_id_addr = &msg->request_id;
856
857   return oh;
858 }
859
860
861 /**
862  * Connect to the set service in order to listen for requests.
863  *
864  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
865  */
866 static void
867 listen_connect (void *cls);
868
869
870 /**
871  * Check validity of request message for a listen operation
872  *
873  * @param cls the listen handle
874  * @param msg the message
875  * @return #GNUNET_OK if the message is well-formed
876  */
877 static int
878 check_request (void *cls,
879                const struct GNUNET_SET_RequestMessage *msg)
880 {
881   const struct GNUNET_MessageHeader *context_msg;
882
883   if (ntohs (msg->header.size) == sizeof (*msg))
884     return GNUNET_OK; /* no context message is OK */
885   context_msg = GNUNET_MQ_extract_nested_mh (msg);
886   if (NULL == context_msg)
887   {
888     /* malformed context message is NOT ok */
889     GNUNET_break_op (0);
890     return GNUNET_SYSERR;
891   }
892   return GNUNET_OK;
893 }
894
895
896 /**
897  * Handle request message for a listen operation
898  *
899  * @param cls the listen handle
900  * @param msg the message
901  */
902 static void
903 handle_request (void *cls,
904                 const struct GNUNET_SET_RequestMessage *msg)
905 {
906   struct GNUNET_SET_ListenHandle *lh = cls;
907   struct GNUNET_SET_Request req;
908   const struct GNUNET_MessageHeader *context_msg;
909   struct GNUNET_MQ_Envelope *mqm;
910   struct GNUNET_SET_RejectMessage *rmsg;
911
912   LOG (GNUNET_ERROR_TYPE_DEBUG,
913        "Processing incoming operation request with id %u\n",
914        ntohl (msg->accept_id));
915   /* we got another valid request => reset the backoff */
916   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
917   req.accept_id = ntohl (msg->accept_id);
918   req.accepted = GNUNET_NO;
919   context_msg = GNUNET_MQ_extract_nested_mh (msg);
920   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
921   lh->listen_cb (lh->listen_cls,
922                  &msg->peer_id,
923                  context_msg,
924                  &req);
925   if (GNUNET_YES == req.accepted)
926     return; /* the accept-case is handled in #GNUNET_SET_accept() */
927   LOG (GNUNET_ERROR_TYPE_DEBUG,
928        "Rejected request %u\n",
929        ntohl (msg->accept_id));
930   mqm = GNUNET_MQ_msg (rmsg,
931                        GNUNET_MESSAGE_TYPE_SET_REJECT);
932   rmsg->accept_reject_id = msg->accept_id;
933   GNUNET_MQ_send (lh->mq, mqm);
934 }
935
936
937 /**
938  * Our connection with the set service encountered an error,
939  * re-initialize with exponential back-off.
940  *
941  * @param cls the `struct GNUNET_SET_ListenHandle *`
942  * @param error reason for the disconnect
943  */
944 static void
945 handle_client_listener_error (void *cls,
946                               enum GNUNET_MQ_Error error)
947 {
948   struct GNUNET_SET_ListenHandle *lh = cls;
949
950   LOG (GNUNET_ERROR_TYPE_DEBUG,
951        "Listener broke down (%d), re-connecting\n",
952        (int) error);
953   GNUNET_MQ_destroy (lh->mq);
954   lh->mq = NULL;
955   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
956                                                      &listen_connect,
957                                                      lh);
958   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
959 }
960
961
962 /**
963  * Connect to the set service in order to listen for requests.
964  *
965  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
966  */
967 static void
968 listen_connect (void *cls)
969 {
970   struct GNUNET_SET_ListenHandle *lh = cls;
971   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
972     GNUNET_MQ_hd_var_size (request,
973                            GNUNET_MESSAGE_TYPE_SET_REQUEST,
974                            struct GNUNET_SET_RequestMessage,
975                            lh),
976     GNUNET_MQ_handler_end ()
977   };
978   struct GNUNET_MQ_Envelope *mqm;
979   struct GNUNET_SET_ListenMessage *msg;
980
981   lh->reconnect_task = NULL;
982   GNUNET_assert (NULL == lh->mq);
983   lh->mq = GNUNET_CLIENT_connect (lh->cfg,
984                                   "set",
985                                   mq_handlers,
986                                   &handle_client_listener_error,
987                                   lh);
988   if (NULL == lh->mq)
989     return;
990   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
991   msg->operation = htonl (lh->operation);
992   msg->app_id = lh->app_id;
993   GNUNET_MQ_send (lh->mq,
994                   mqm);
995 }
996
997
998 /**
999  * Wait for set operation requests for the given application id
1000  *
1001  * @param cfg configuration to use for connecting to
1002  *            the set service, needs to be valid for the lifetime of the listen handle
1003  * @param operation operation we want to listen for
1004  * @param app_id id of the application that handles set operation requests
1005  * @param listen_cb called for each incoming request matching the operation
1006  *                  and application id
1007  * @param listen_cls handle for @a listen_cb
1008  * @return a handle that can be used to cancel the listen operation
1009  */
1010 struct GNUNET_SET_ListenHandle *
1011 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1012                    enum GNUNET_SET_OperationType operation,
1013                    const struct GNUNET_HashCode *app_id,
1014                    GNUNET_SET_ListenCallback listen_cb,
1015                    void *listen_cls)
1016 {
1017   struct GNUNET_SET_ListenHandle *lh;
1018
1019   LOG (GNUNET_ERROR_TYPE_DEBUG,
1020        "Starting listener for app %s\n",
1021        GNUNET_h2s (app_id));
1022   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
1023   lh->listen_cb = listen_cb;
1024   lh->listen_cls = listen_cls;
1025   lh->cfg = cfg;
1026   lh->operation = operation;
1027   lh->app_id = *app_id;
1028   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1029   listen_connect (lh);
1030   if (NULL == lh->mq)
1031   {
1032     GNUNET_free (lh);
1033     return NULL;
1034   }
1035   return lh;
1036 }
1037
1038
1039 /**
1040  * Cancel the given listen operation.
1041  *
1042  * @param lh handle for the listen operation
1043  */
1044 void
1045 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
1046 {
1047   LOG (GNUNET_ERROR_TYPE_DEBUG,
1048        "Canceling listener %s\n",
1049        GNUNET_h2s (&lh->app_id));
1050   if (NULL != lh->mq)
1051   {
1052     GNUNET_MQ_destroy (lh->mq);
1053     lh->mq = NULL;
1054   }
1055   if (NULL != lh->reconnect_task)
1056   {
1057     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
1058     lh->reconnect_task = NULL;
1059   }
1060   GNUNET_free (lh);
1061 }
1062
1063
1064 /**
1065  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
1066  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
1067  * afterwards.
1068  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
1069  * and to begin the exchange with the remote peer.
1070  *
1071  * @param request request to accept
1072  * @param result_mode specified how results will be returned,
1073  *        see `enum GNUNET_SET_ResultMode`.
1074  * @param result_cb callback for the results
1075  * @param result_cls closure for @a result_cb
1076  * @return a handle to cancel the operation
1077  */
1078 struct GNUNET_SET_OperationHandle *
1079 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1080                    enum GNUNET_SET_ResultMode result_mode,
1081                    struct GNUNET_SET_Option options[],
1082                    GNUNET_SET_ResultIterator result_cb,
1083                    void *result_cls)
1084 {
1085   struct GNUNET_MQ_Envelope *mqm;
1086   struct GNUNET_SET_OperationHandle *oh;
1087   struct GNUNET_SET_AcceptMessage *msg;
1088
1089   GNUNET_assert (GNUNET_NO == request->accepted);
1090   LOG (GNUNET_ERROR_TYPE_DEBUG,
1091        "Client accepts set operation (%d) with id %u\n",
1092        result_mode,
1093        request->accept_id);
1094   request->accepted = GNUNET_YES;
1095   mqm = GNUNET_MQ_msg (msg,
1096                        GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1097   msg->accept_reject_id = htonl (request->accept_id);
1098   msg->result_mode = htonl (result_mode);
1099   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1100   oh->result_cb = result_cb;
1101   oh->result_cls = result_cls;
1102   oh->conclude_mqm = mqm;
1103   oh->request_id_addr = &msg->request_id;
1104   return oh;
1105 }
1106
1107
1108 /**
1109  * Commit a set to be used with a set operation.
1110  * This function is called once we have fully constructed
1111  * the set that we want to use for the operation.  At this
1112  * time, the P2P protocol can then begin to exchange the
1113  * set information and call the result callback with the
1114  * result information.
1115  *
1116  * @param oh handle to the set operation
1117  * @param set the set to use for the operation
1118  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1119  *         set is invalid (e.g. the set service crashed)
1120  */
1121 int
1122 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1123                    struct GNUNET_SET_Handle *set)
1124 {
1125   if (NULL != oh->set)
1126   {
1127     /* Some other set was already committed for this
1128      * operation, there is a logic bug in the client of this API */
1129     GNUNET_break (0);
1130     return GNUNET_OK;
1131   }
1132   GNUNET_assert (NULL != set);
1133   if (GNUNET_YES == set->invalid)
1134     return GNUNET_SYSERR;
1135   LOG (GNUNET_ERROR_TYPE_DEBUG,
1136        "Client commits to SET\n");
1137   GNUNET_assert (NULL != oh->conclude_mqm);
1138   oh->set = set;
1139   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1140                                set->ops_tail,
1141                                oh);
1142   oh->request_id = GNUNET_MQ_assoc_add (set->mq,
1143                                         oh);
1144   *oh->request_id_addr = htonl (oh->request_id);
1145   GNUNET_MQ_send (set->mq,
1146                   oh->conclude_mqm);
1147   oh->conclude_mqm = NULL;
1148   oh->request_id_addr = NULL;
1149   return GNUNET_OK;
1150 }
1151
1152
1153 /**
1154  * Iterate over all elements in the given set.  Note that this
1155  * operation involves transferring every element of the set from the
1156  * service to the client, and is thus costly.
1157  *
1158  * @param set the set to iterate over
1159  * @param iter the iterator to call for each element
1160  * @param iter_cls closure for @a iter
1161  * @return #GNUNET_YES if the iteration started successfuly,
1162  *         #GNUNET_NO if another iteration is active
1163  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1164  */
1165 int
1166 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1167                     GNUNET_SET_ElementIterator iter,
1168                     void *iter_cls)
1169 {
1170   struct GNUNET_MQ_Envelope *ev;
1171
1172   GNUNET_assert (NULL != iter);
1173   if (GNUNET_YES == set->invalid)
1174     return GNUNET_SYSERR;
1175   if (NULL != set->iterator)
1176     return GNUNET_NO;
1177   LOG (GNUNET_ERROR_TYPE_DEBUG,
1178        "Iterating over set\n");
1179   set->iterator = iter;
1180   set->iterator_cls = iter_cls;
1181   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1182   GNUNET_MQ_send (set->mq, ev);
1183   return GNUNET_YES;
1184 }
1185
1186
1187 void
1188 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1189                       GNUNET_SET_CopyReadyCallback cb,
1190                       void *cls)
1191 {
1192   struct GNUNET_MQ_Envelope *ev;
1193   struct SetCopyRequest *req;
1194
1195   LOG (GNUNET_ERROR_TYPE_DEBUG,
1196        "Creating lazy copy of set\n");
1197   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1198   GNUNET_MQ_send (set->mq, ev);
1199
1200   req = GNUNET_new (struct SetCopyRequest);
1201   req->cb = cb;
1202   req->cls = cls;
1203   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1204                                set->copy_req_tail,
1205                                req);
1206 }
1207
1208
1209 /**
1210  * Create a copy of an element.  The copy
1211  * must be GNUNET_free-d by the caller.
1212  *
1213  * @param element the element to copy
1214  * @return the copied element
1215  */
1216 struct GNUNET_SET_Element *
1217 GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
1218 {
1219   struct GNUNET_SET_Element *copy;
1220
1221   copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
1222   copy->size = element->size;
1223   copy->element_type = element->element_type;
1224   copy->data = &copy[1];
1225   GNUNET_memcpy (&copy[1],
1226                  element->data,
1227                  copy->size);
1228   return copy;
1229 }
1230
1231
1232 /**
1233  * Hash a set element.
1234  *
1235  * @param element the element that should be hashed
1236  * @param[out] ret_hash a pointer to where the hash of @a element
1237  *        should be stored
1238  */
1239 void
1240 GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
1241                          struct GNUNET_HashCode *ret_hash)
1242 {
1243   struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
1244
1245   /* It's not guaranteed that the element data is always after the element header,
1246      so we need to hash the chunks separately. */
1247   GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
1248   GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
1249   GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
1250   GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
1251 }
1252
1253 /* end of set_api.c */