+
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_set_service.h"
30 #include "set.h"
31
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
34
35 struct SetCopyRequest
36 {
37   struct SetCopyRequest *next;
38
39   struct SetCopyRequest *prev;
40
41   void *cls;
42
43   GNUNET_SET_CopyReadyCallback cb;
44 };
45
46 /**
47  * Opaque handle to a set.
48  */
49 struct GNUNET_SET_Handle
50 {
51   /**
52    * Message queue for @e client.
53    */
54   struct GNUNET_MQ_Handle *mq;
55
56   /**
57    * Linked list of operations on the set.
58    */
59   struct GNUNET_SET_OperationHandle *ops_head;
60
61   /**
62    * Linked list of operations on the set.
63    */
64   struct GNUNET_SET_OperationHandle *ops_tail;
65
66   /**
67    * Callback for the current iteration over the set,
68    * NULL if no iterator is active.
69    */
70   GNUNET_SET_ElementIterator iterator;
71
72   /**
73    * Closure for @e iterator
74    */
75   void *iterator_cls;
76
77   /**
78    * Should the set be destroyed once all operations are gone?
79    * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
80    * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
81    */
82   int destroy_requested;
83
84   /**
85    * Has the set become invalid (e.g. service died)?
86    */
87   int invalid;
88
89   /**
90    * Both client and service count the number of iterators
91    * created so far to match replies with iterators.
92    */
93   uint16_t iteration_id;
94
95   /**
96    * Configuration, needed when creating (lazy) copies.
97    */
98   const struct GNUNET_CONFIGURATION_Handle *cfg;
99
100   /**
101    * Doubly linked list of copy requests.
102    */
103   struct SetCopyRequest *copy_req_head;
104
105   /**
106    * Doubly linked list of copy requests.
107    */
108   struct SetCopyRequest *copy_req_tail;
109 };
110
111
112 /**
113  * Handle for a set operation request from another peer.
114  */
115 struct GNUNET_SET_Request
116 {
117   /**
118    * Id of the request, used to identify the request when
119    * accepting/rejecting it.
120    */
121   uint32_t accept_id;
122
123   /**
124    * Has the request been accepted already?
125    * #GNUNET_YES/#GNUNET_NO
126    */
127   int accepted;
128 };
129
130
131 /**
132  * Handle to an operation.  Only known to the service after committing
133  * the handle with a set.
134  */
135 struct GNUNET_SET_OperationHandle
136 {
137   /**
138    * Function to be called when we have a result,
139    * or an error.
140    */
141   GNUNET_SET_ResultIterator result_cb;
142
143   /**
144    * Closure for @e result_cb.
145    */
146   void *result_cls;
147
148   /**
149    * Local set used for the operation,
150    * NULL if no set has been provided by conclude yet.
151    */
152   struct GNUNET_SET_Handle *set;
153
154   /**
155    * Message sent to the server on calling conclude,
156    * NULL if conclude has been called.
157    */
158   struct GNUNET_MQ_Envelope *conclude_mqm;
159
160   /**
161    * Address of the request if in the conclude message,
162    * used to patch the request id into the message when the set is known.
163    */
164   uint32_t *request_id_addr;
165
166   /**
167    * Handles are kept in a linked list.
168    */
169   struct GNUNET_SET_OperationHandle *prev;
170
171   /**
172    * Handles are kept in a linked list.
173    */
174   struct GNUNET_SET_OperationHandle *next;
175
176   /**
177    * Request ID to identify the operation within the set.
178    */
179   uint32_t request_id;
180 };
181
182
183 /**
184  * Opaque handle to a listen operation.
185  */
186 struct GNUNET_SET_ListenHandle
187 {
188
189   /**
190    * Message queue for the client.
191    */
192   struct GNUNET_MQ_Handle* mq;
193
194   /**
195    * Configuration handle for the listener, stored
196    * here to be able to reconnect transparently on
197    * connection failure.
198    */
199   const struct GNUNET_CONFIGURATION_Handle *cfg;
200
201   /**
202    * Function to call on a new incoming request,
203    * or on error.
204    */
205   GNUNET_SET_ListenCallback listen_cb;
206
207   /**
208    * Closure for @e listen_cb.
209    */
210   void *listen_cls;
211
212   /**
213    * Application ID we listen for.
214    */
215   struct GNUNET_HashCode app_id;
216
217   /**
218    * Time to wait until we try to reconnect on failure.
219    */
220   struct GNUNET_TIME_Relative reconnect_backoff;
221
222   /**
223    * Task for reconnecting when the listener fails.
224    */
225   struct GNUNET_SCHEDULER_Task *reconnect_task;
226
227   /**
228    * Operation we listen for.
229    */
230   enum GNUNET_SET_OperationType operation;
231 };
232
233
234 /* mutual recursion with handle_copy_lazy */
235 static struct GNUNET_SET_Handle *
236 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
237                  enum GNUNET_SET_OperationType op,
238                  const uint32_t *cookie);
239
240
241 /**
242  * Handle element for iteration over the set.  Notifies the
243  * iterator and sends an acknowledgement to the service.
244  *
245  * @param cls the `struct GNUNET_SET_Handle *`
246  * @param msg the message
247  */
248 static void
249 handle_copy_lazy (void *cls,
250                   const struct GNUNET_SET_CopyLazyResponseMessage *msg)
251 {
252   struct GNUNET_SET_Handle *set = cls;
253   struct SetCopyRequest *req;
254   struct GNUNET_SET_Handle *new_set;
255
256   req = set->copy_req_head;
257   if (NULL == req)
258   {
259     /* Service sent us unsolicited lazy copy response */
260     GNUNET_break (0);
261     return;
262   }
263
264   LOG (GNUNET_ERROR_TYPE_DEBUG,
265        "Handling response to lazy copy\n");
266   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
267                                set->copy_req_tail,
268                                req);
269   // We pass none as operation here, since it doesn't matter when
270   // cloning.
271   new_set = create_internal (set->cfg,
272                              GNUNET_SET_OPERATION_NONE,
273                              &msg->cookie);
274   req->cb (req->cls, new_set);
275   GNUNET_free (req);
276 }
277
278
279 /**
280  * Check that the given @a msg is well-formed.
281  *
282  * @param cls closure
283  * @param msg message to check
284  * @return #GNUNET_OK if message is well-formed
285  */
286 static int
287 check_iter_element (void *cls,
288                     const struct GNUNET_SET_IterResponseMessage *msg)
289 {
290   /* minimum size was already checked, everything else is OK! */
291   return GNUNET_OK;
292 }
293
294
295 /**
296  * Handle element for iteration over the set.  Notifies the
297  * iterator and sends an acknowledgement to the service.
298  *
299  * @param cls the `struct GNUNET_SET_Handle *`
300  * @param mh the message
301  */
302 static void
303 handle_iter_element (void *cls,
304                      const struct GNUNET_SET_IterResponseMessage *msg)
305 {
306   struct GNUNET_SET_Handle *set = cls;
307   GNUNET_SET_ElementIterator iter = set->iterator;
308   struct GNUNET_SET_Element element;
309   struct GNUNET_SET_IterAckMessage *ack_msg;
310   struct GNUNET_MQ_Envelope *ev;
311   uint16_t msize;
312
313   LOG (GNUNET_ERROR_TYPE_DEBUG,
314        "Received element in set iteration\n");
315   msize = ntohs (msg->header.size);
316   if (set->iteration_id != ntohs (msg->iteration_id))
317   {
318     /* element from a previous iteration, skip! */
319     iter = NULL;
320   }
321   if (NULL != iter)
322   {
323     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
324     element.element_type = ntohs (msg->element_type);
325     element.data = &msg[1];
326     iter (set->iterator_cls,
327           &element);
328   }
329   ev = GNUNET_MQ_msg (ack_msg,
330                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
331   ack_msg->send_more = htonl ((NULL != iter));
332   GNUNET_MQ_send (set->mq, ev);
333 }
334
335
336 /**
337  * Handle message signalling conclusion of iteration over the set.
338  * Notifies the iterator that we are done.
339  *
340  * @param cls the set
341  * @param mh the message
342  */
343 static void
344 handle_iter_done (void *cls,
345                   const struct GNUNET_MessageHeader *mh)
346 {
347   struct GNUNET_SET_Handle *set = cls;
348   GNUNET_SET_ElementIterator iter = set->iterator;
349
350   if (NULL == iter)
351   {
352     /* FIXME: if this is true, could cancel+start a fresh one
353        cause elements to go to the wrong iteration? */
354     LOG (GNUNET_ERROR_TYPE_INFO,
355          "Service completed set iteration that was already cancelled\n");
356     return;
357   }
358   LOG (GNUNET_ERROR_TYPE_DEBUG,
359        "Set iteration completed\n");
360   set->destroy_requested = GNUNET_SYSERR;
361   set->iterator = NULL;
362   set->iteration_id++;
363   iter (set->iterator_cls,
364         NULL);
365   if (GNUNET_SYSERR == set->destroy_requested)
366     set->destroy_requested = GNUNET_NO;
367   if (GNUNET_YES == set->destroy_requested)
368     GNUNET_SET_destroy (set);
369 }
370
371
372 /**
373  * Check that the given @a msg is well-formed.
374  *
375  * @param cls closure
376  * @param msg message to check
377  * @return #GNUNET_OK if message is well-formed
378  */
379 static int
380 check_result (void *cls,
381               const struct GNUNET_SET_ResultMessage *msg)
382 {
383   /* minimum size was already checked, everything else is OK! */
384   return GNUNET_OK;
385 }
386
387
388 /**
389  * Handle result message for a set operation.
390  *
391  * @param cls the set
392  * @param mh the message
393  */
394 static void
395 handle_result (void *cls,
396                const struct GNUNET_SET_ResultMessage *msg)
397 {
398   struct GNUNET_SET_Handle *set = cls;
399   struct GNUNET_SET_OperationHandle *oh;
400   struct GNUNET_SET_Element e;
401   enum GNUNET_SET_Status result_status;
402   int destroy_set;
403
404   GNUNET_assert (NULL != set->mq);
405   result_status = (enum GNUNET_SET_Status) ntohs (msg->result_status);
406   LOG (GNUNET_ERROR_TYPE_DEBUG,
407        "Got result message with status %d\n",
408        result_status);
409
410   oh = GNUNET_MQ_assoc_get (set->mq,
411                             ntohl (msg->request_id));
412   if (NULL == oh)
413   {
414     /* 'oh' can be NULL if we canceled the operation, but the service
415        did not get the cancel message yet. */
416     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417                 "Ignoring result from canceled operation\n");
418     return;
419   }
420
421   switch (result_status)
422   {
423     case GNUNET_SET_STATUS_OK:
424     case GNUNET_SET_STATUS_ADD_LOCAL:
425     case GNUNET_SET_STATUS_ADD_REMOTE:
426       goto do_element;
427     case GNUNET_SET_STATUS_FAILURE:
428     case GNUNET_SET_STATUS_DONE:
429       goto do_final;
430     case GNUNET_SET_STATUS_HALF_DONE:
431       /* not used anymore */
432       GNUNET_assert (0);
433   }
434
435 do_final:
436   LOG (GNUNET_ERROR_TYPE_DEBUG,
437        "Treating result as final status\n");
438   GNUNET_MQ_assoc_remove (set->mq,
439                           ntohl (msg->request_id));
440   GNUNET_CONTAINER_DLL_remove (set->ops_head,
441                                set->ops_tail,
442                                oh);
443   /* Need to do this calculation _before_ the result callback,
444      as IF the application still has a valid set handle, it
445      may trigger destruction of the set during the callback. */
446   destroy_set = (GNUNET_YES == set->destroy_requested) &&
447                 (NULL == set->ops_head);
448   if (NULL != oh->result_cb)
449   {
450     oh->result_cb (oh->result_cls,
451                    NULL,
452                    GNUNET_ntohll (msg->current_size),
453                    result_status);
454   }
455   else
456   {
457     LOG (GNUNET_ERROR_TYPE_DEBUG,
458          "No callback for final status\n");
459   }
460   if (destroy_set)
461     GNUNET_SET_destroy (set);
462   GNUNET_free (oh);
463   return;
464
465 do_element:
466   LOG (GNUNET_ERROR_TYPE_DEBUG,
467        "Treating result as element\n");
468   e.data = &msg[1];
469   e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
470   e.element_type = ntohs (msg->element_type);
471   if (NULL != oh->result_cb)
472     oh->result_cb (oh->result_cls,
473                    &e,
474                    GNUNET_ntohll (msg->current_size),
475                    result_status);
476 }
477
478
479 /**
480  * Destroy the given set operation.
481  *
482  * @param oh set operation to destroy
483  */
484 static void
485 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
486 {
487   struct GNUNET_SET_Handle *set = oh->set;
488   struct GNUNET_SET_OperationHandle *h_assoc;
489
490   if (NULL != oh->conclude_mqm)
491     GNUNET_MQ_discard (oh->conclude_mqm);
492   /* is the operation already commited? */
493   if (NULL != set)
494   {
495     GNUNET_CONTAINER_DLL_remove (set->ops_head,
496                                  set->ops_tail,
497                                  oh);
498     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
499                                       oh->request_id);
500     GNUNET_assert ( (NULL == h_assoc) ||
501                     (h_assoc == oh) );
502   }
503   GNUNET_free (oh);
504 }
505
506
507 /**
508  * Cancel the given set operation.  We need to send an explicit cancel
509  * message, as all operations one one set communicate using one
510  * handle.
511  *
512  * @param oh set operation to cancel
513  */
514 void
515 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
516 {
517   struct GNUNET_SET_Handle *set = oh->set;
518   struct GNUNET_SET_CancelMessage *m;
519   struct GNUNET_MQ_Envelope *mqm;
520
521   LOG (GNUNET_ERROR_TYPE_DEBUG,
522        "Cancelling SET operation\n");
523   if (NULL != set)
524   {
525     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
526     m->request_id = htonl (oh->request_id);
527     GNUNET_MQ_send (set->mq, mqm);
528   }
529   set_operation_destroy (oh);
530   if ( (NULL != set) &&
531        (GNUNET_YES == set->destroy_requested) &&
532        (NULL == set->ops_head) )
533   {
534     LOG (GNUNET_ERROR_TYPE_DEBUG,
535          "Destroying set after operation cancel\n");
536     GNUNET_SET_destroy (set);
537   }
538 }
539
540
541 /**
542  * We encountered an error communicating with the set service while
543  * performing a set operation. Report to the application.
544  *
545  * @param cls the `struct GNUNET_SET_Handle`
546  * @param error error code
547  */
548 static void
549 handle_client_set_error (void *cls,
550                          enum GNUNET_MQ_Error error)
551 {
552   struct GNUNET_SET_Handle *set = cls;
553   GNUNET_SET_ElementIterator iter = set->iterator;
554
555   LOG (GNUNET_ERROR_TYPE_ERROR,
556        "Handling client set error %d\n",
557        error);
558   while (NULL != set->ops_head)
559   {
560     if ( (NULL != set->ops_head->result_cb) &&
561          (GNUNET_NO == set->destroy_requested) )
562       set->ops_head->result_cb (set->ops_head->result_cls,
563                                 NULL,
564                                 0,
565                                 GNUNET_SET_STATUS_FAILURE);
566     set_operation_destroy (set->ops_head);
567   }
568   set->iterator = NULL;
569   set->iteration_id++;
570   set->invalid = GNUNET_YES;
571   if (NULL != iter)
572     iter (set->iterator_cls,
573           NULL);
574 }
575
576
577 /**
578  * FIXME.
579  */
580 static struct GNUNET_SET_Handle *
581 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
582                  enum GNUNET_SET_OperationType op,
583                  const uint32_t *cookie)
584 {
585   struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
586   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
587     GNUNET_MQ_hd_var_size (result,
588                            GNUNET_MESSAGE_TYPE_SET_RESULT,
589                            struct GNUNET_SET_ResultMessage,
590                            set),
591     GNUNET_MQ_hd_var_size (iter_element,
592                            GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
593                            struct GNUNET_SET_IterResponseMessage,
594                            set),
595     GNUNET_MQ_hd_fixed_size (iter_done,
596                              GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
597                              struct GNUNET_MessageHeader,
598                              set),
599     GNUNET_MQ_hd_fixed_size (copy_lazy,
600                              GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
601                              struct GNUNET_SET_CopyLazyResponseMessage,
602                              set),
603     GNUNET_MQ_handler_end ()
604   };
605   struct GNUNET_MQ_Envelope *mqm;
606   struct GNUNET_SET_CreateMessage *create_msg;
607   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
608
609   set->cfg = cfg;
610   set->mq = GNUNET_CLIENT_connect (cfg,
611                                    "set",
612                                    mq_handlers,
613                                    &handle_client_set_error,
614                                    set);
615   if (NULL == set->mq)
616   {
617     GNUNET_free (set);
618     return NULL;
619   }
620   if (NULL == cookie)
621   {
622     LOG (GNUNET_ERROR_TYPE_DEBUG,
623          "Creating new set (operation %u)\n",
624          op);
625     mqm = GNUNET_MQ_msg (create_msg,
626                          GNUNET_MESSAGE_TYPE_SET_CREATE);
627     create_msg->operation = htonl (op);
628   }
629   else
630   {
631     LOG (GNUNET_ERROR_TYPE_DEBUG,
632          "Creating new set (lazy copy)\n",
633          op);
634     mqm = GNUNET_MQ_msg (copy_msg,
635                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
636     copy_msg->cookie = *cookie;
637   }
638   GNUNET_MQ_send (set->mq,
639                   mqm);
640   return set;
641 }
642
643
644 /**
645  * Create an empty set, supporting the specified operation.
646  *
647  * @param cfg configuration to use for connecting to the
648  *        set service
649  * @param op operation supported by the set
650  *        Note that the operation has to be specified
651  *        beforehand, as certain set operations need to maintain
652  *        data structures spefific to the operation
653  * @return a handle to the set
654  */
655 struct GNUNET_SET_Handle *
656 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
657                    enum GNUNET_SET_OperationType op)
658 {
659   struct GNUNET_SET_Handle *set;
660
661   set = create_internal (cfg,
662                           op,
663                           NULL);
664   LOG (GNUNET_ERROR_TYPE_DEBUG,
665        "Creating set %p for operation %d\n",
666        set,
667        op);
668   return set;
669 }
670
671
672 /**
673  * Add an element to the given set.  After the element has been added
674  * (in the sense of being transmitted to the set service), @a cont
675  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
676  * queued.
677  *
678  * @param set set to add element to
679  * @param element element to add to the set
680  * @param cont continuation called after the element has been added
681  * @param cont_cls closure for @a cont
682  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
683  *         set is invalid (e.g. the set service crashed)
684  */
685 int
686 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
687                         const struct GNUNET_SET_Element *element,
688                         GNUNET_SET_Continuation cont,
689                         void *cont_cls)
690 {
691   struct GNUNET_MQ_Envelope *mqm;
692   struct GNUNET_SET_ElementMessage *msg;
693
694   LOG (GNUNET_ERROR_TYPE_DEBUG,
695        "adding element of type %u to set %p\n",
696        (unsigned int) element->element_type,
697        set);
698   GNUNET_assert (NULL != set);
699   if (GNUNET_YES == set->invalid)
700   {
701     if (NULL != cont)
702       cont (cont_cls);
703     return GNUNET_SYSERR;
704   }
705   mqm = GNUNET_MQ_msg_extra (msg,
706                              element->size,
707                              GNUNET_MESSAGE_TYPE_SET_ADD);
708   msg->element_type = htons (element->element_type);
709   GNUNET_memcpy (&msg[1],
710                  element->data,
711                  element->size);
712   GNUNET_MQ_notify_sent (mqm,
713                          cont, cont_cls);
714   GNUNET_MQ_send (set->mq, mqm);
715   return GNUNET_OK;
716 }
717
718
719 /**
720  * Remove an element to the given set.  After the element has been
721  * removed (in the sense of the request being transmitted to the set
722  * service), @a cont will be called.  Multiple calls to
723  * GNUNET_SET_remove_element() can be queued
724  *
725  * @param set set to remove element from
726  * @param element element to remove from the set
727  * @param cont continuation called after the element has been removed
728  * @param cont_cls closure for @a cont
729  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
730  *         set is invalid (e.g. the set service crashed)
731  */
732 int
733 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
734                            const struct GNUNET_SET_Element *element,
735                            GNUNET_SET_Continuation cont,
736                            void *cont_cls)
737 {
738   struct GNUNET_MQ_Envelope *mqm;
739   struct GNUNET_SET_ElementMessage *msg;
740
741   LOG (GNUNET_ERROR_TYPE_DEBUG,
742        "Removing element from set %p\n",
743        set);
744   if (GNUNET_YES == set->invalid)
745   {
746     if (NULL != cont)
747       cont (cont_cls);
748     return GNUNET_SYSERR;
749   }
750   mqm = GNUNET_MQ_msg_extra (msg,
751                              element->size,
752                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
753   msg->element_type = htons (element->element_type);
754   GNUNET_memcpy (&msg[1],
755                  element->data,
756                  element->size);
757   GNUNET_MQ_notify_sent (mqm,
758                          cont, cont_cls);
759   GNUNET_MQ_send (set->mq, mqm);
760   return GNUNET_OK;
761 }
762
763
764 /**
765  * Destroy the set handle if no operations are left, mark the set
766  * for destruction otherwise.
767  *
768  * @param set set handle to destroy
769  */
770 void
771 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
772 {
773   /* destroying set while iterator is active is currently
774      not supported; we should expand the API to allow
775      clients to explicitly cancel the iteration! */
776   GNUNET_assert (NULL != set);
777   if ( (NULL != set->ops_head) ||
778        (NULL != set->iterator) ||
779        (GNUNET_SYSERR == set->destroy_requested) )
780   {
781     LOG (GNUNET_ERROR_TYPE_DEBUG,
782          "Set operations are pending, delaying set destruction\n");
783     set->destroy_requested = GNUNET_YES;
784     return;
785   }
786   LOG (GNUNET_ERROR_TYPE_DEBUG,
787        "Really destroying set\n");
788   if (NULL != set->mq)
789   {
790     GNUNET_MQ_destroy (set->mq);
791     set->mq = NULL;
792   }
793   GNUNET_free (set);
794 }
795
796
797 /**
798  * Prepare a set operation to be evaluated with another peer.
799  * The evaluation will not start until the client provides
800  * a local set with #GNUNET_SET_commit().
801  *
802  * @param other_peer peer with the other set
803  * @param app_id hash for the application using the set
804  * @param context_msg additional information for the request
805  * @param result_mode specified how results will be returned,
806  *        see `enum GNUNET_SET_ResultMode`.
807  * @param result_cb called on error or success
808  * @param result_cls closure for @e result_cb
809  * @return a handle to cancel the operation
810  */
811 struct GNUNET_SET_OperationHandle *
812 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
813                     const struct GNUNET_HashCode *app_id,
814                     const struct GNUNET_MessageHeader *context_msg,
815                     enum GNUNET_SET_ResultMode result_mode,
816                     struct GNUNET_SET_Option options[],
817                     GNUNET_SET_ResultIterator result_cb,
818                     void *result_cls)
819 {
820   struct GNUNET_MQ_Envelope *mqm;
821   struct GNUNET_SET_OperationHandle *oh;
822   struct GNUNET_SET_EvaluateMessage *msg;
823   struct GNUNET_SET_Option *opt;
824
825   LOG (GNUNET_ERROR_TYPE_DEBUG,
826        "Client prepares set operation (%d)\n",
827        result_mode);
828   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
829   oh->result_cb = result_cb;
830   oh->result_cls = result_cls;
831   mqm = GNUNET_MQ_msg_nested_mh (msg,
832                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
833                                  context_msg);
834   msg->app_id = *app_id;
835   msg->result_mode = htonl (result_mode);
836   msg->target_peer = *other_peer;
837   for (opt = options; opt->type != 0; opt++)
838   {
839     switch (opt->type)
840     {
841       case GNUNET_SET_OPTION_BYZANTINE:
842         msg->byzantine = GNUNET_YES;
843         msg->byzantine_lower_bound = opt->v.num;
844         break;
845       case GNUNET_SET_OPTION_FORCE_FULL:
846         msg->force_full = GNUNET_YES;
847         break;
848       case GNUNET_SET_OPTION_FORCE_DELTA:
849         msg->force_delta = GNUNET_YES;
850         break;
851       default:
852         LOG (GNUNET_ERROR_TYPE_ERROR,
853              "Option with type %d not recognized\n", (int) opt->type);
854     }
855   }
856   oh->conclude_mqm = mqm;
857   oh->request_id_addr = &msg->request_id;
858
859   return oh;
860 }
861
862
863 /**
864  * Connect to the set service in order to listen for requests.
865  *
866  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
867  */
868 static void
869 listen_connect (void *cls);
870
871
872 /**
873  * Check validity of request message for a listen operation
874  *
875  * @param cls the listen handle
876  * @param msg the message
877  * @return #GNUNET_OK if the message is well-formed
878  */
879 static int
880 check_request (void *cls,
881                const struct GNUNET_SET_RequestMessage *msg)
882 {
883   const struct GNUNET_MessageHeader *context_msg;
884
885   if (ntohs (msg->header.size) == sizeof (*msg))
886     return GNUNET_OK; /* no context message is OK */
887   context_msg = GNUNET_MQ_extract_nested_mh (msg);
888   if (NULL == context_msg)
889   {
890     /* malformed context message is NOT ok */
891     GNUNET_break_op (0);
892     return GNUNET_SYSERR;
893   }
894   return GNUNET_OK;
895 }
896
897
898 /**
899  * Handle request message for a listen operation
900  *
901  * @param cls the listen handle
902  * @param msg the message
903  */
904 static void
905 handle_request (void *cls,
906                 const struct GNUNET_SET_RequestMessage *msg)
907 {
908   struct GNUNET_SET_ListenHandle *lh = cls;
909   struct GNUNET_SET_Request req;
910   const struct GNUNET_MessageHeader *context_msg;
911   struct GNUNET_MQ_Envelope *mqm;
912   struct GNUNET_SET_RejectMessage *rmsg;
913
914   LOG (GNUNET_ERROR_TYPE_DEBUG,
915        "Processing incoming operation request with id %u\n",
916        ntohl (msg->accept_id));
917   /* we got another valid request => reset the backoff */
918   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
919   req.accept_id = ntohl (msg->accept_id);
920   req.accepted = GNUNET_NO;
921   context_msg = GNUNET_MQ_extract_nested_mh (msg);
922   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
923   lh->listen_cb (lh->listen_cls,
924                  &msg->peer_id,
925                  context_msg,
926                  &req);
927   if (GNUNET_YES == req.accepted)
928     return; /* the accept-case is handled in #GNUNET_SET_accept() */
929   LOG (GNUNET_ERROR_TYPE_DEBUG,
930        "Rejected request %u\n",
931        ntohl (msg->accept_id));
932   mqm = GNUNET_MQ_msg (rmsg,
933                        GNUNET_MESSAGE_TYPE_SET_REJECT);
934   rmsg->accept_reject_id = msg->accept_id;
935   GNUNET_MQ_send (lh->mq, mqm);
936 }
937
938
939 /**
940  * Our connection with the set service encountered an error,
941  * re-initialize with exponential back-off.
942  *
943  * @param cls the `struct GNUNET_SET_ListenHandle *`
944  * @param error reason for the disconnect
945  */
946 static void
947 handle_client_listener_error (void *cls,
948                               enum GNUNET_MQ_Error error)
949 {
950   struct GNUNET_SET_ListenHandle *lh = cls;
951
952   LOG (GNUNET_ERROR_TYPE_DEBUG,
953        "Listener broke down (%d), re-connecting\n",
954        (int) error);
955   GNUNET_MQ_destroy (lh->mq);
956   lh->mq = NULL;
957   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
958                                                      &listen_connect,
959                                                      lh);
960   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
961 }
962
963
964 /**
965  * Connect to the set service in order to listen for requests.
966  *
967  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
968  */
969 static void
970 listen_connect (void *cls)
971 {
972   struct GNUNET_SET_ListenHandle *lh = cls;
973   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
974     GNUNET_MQ_hd_var_size (request,
975                            GNUNET_MESSAGE_TYPE_SET_REQUEST,
976                            struct GNUNET_SET_RequestMessage,
977                            lh),
978     GNUNET_MQ_handler_end ()
979   };
980   struct GNUNET_MQ_Envelope *mqm;
981   struct GNUNET_SET_ListenMessage *msg;
982
983   lh->reconnect_task = NULL;
984   GNUNET_assert (NULL == lh->mq);
985   lh->mq = GNUNET_CLIENT_connect (lh->cfg,
986                                   "set",
987                                   mq_handlers,
988                                   &handle_client_listener_error,
989                                   lh);
990   if (NULL == lh->mq)
991     return;
992   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
993   msg->operation = htonl (lh->operation);
994   msg->app_id = lh->app_id;
995   GNUNET_MQ_send (lh->mq,
996                   mqm);
997 }
998
999
1000 /**
1001  * Wait for set operation requests for the given application id
1002  *
1003  * @param cfg configuration to use for connecting to
1004  *            the set service, needs to be valid for the lifetime of the listen handle
1005  * @param operation operation we want to listen for
1006  * @param app_id id of the application that handles set operation requests
1007  * @param listen_cb called for each incoming request matching the operation
1008  *                  and application id
1009  * @param listen_cls handle for @a listen_cb
1010  * @return a handle that can be used to cancel the listen operation
1011  */
1012 struct GNUNET_SET_ListenHandle *
1013 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1014                    enum GNUNET_SET_OperationType operation,
1015                    const struct GNUNET_HashCode *app_id,
1016                    GNUNET_SET_ListenCallback listen_cb,
1017                    void *listen_cls)
1018 {
1019   struct GNUNET_SET_ListenHandle *lh;
1020
1021   LOG (GNUNET_ERROR_TYPE_DEBUG,
1022        "Starting listener for app %s\n",
1023        GNUNET_h2s (app_id));
1024   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
1025   lh->listen_cb = listen_cb;
1026   lh->listen_cls = listen_cls;
1027   lh->cfg = cfg;
1028   lh->operation = operation;
1029   lh->app_id = *app_id;
1030   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1031   listen_connect (lh);
1032   if (NULL == lh->mq)
1033   {
1034     GNUNET_free (lh);
1035     return NULL;
1036   }
1037   return lh;
1038 }
1039
1040
1041 /**
1042  * Cancel the given listen operation.
1043  *
1044  * @param lh handle for the listen operation
1045  */
1046 void
1047 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
1048 {
1049   LOG (GNUNET_ERROR_TYPE_DEBUG,
1050        "Canceling listener %s\n",
1051        GNUNET_h2s (&lh->app_id));
1052   if (NULL != lh->mq)
1053   {
1054     GNUNET_MQ_destroy (lh->mq);
1055     lh->mq = NULL;
1056   }
1057   if (NULL != lh->reconnect_task)
1058   {
1059     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
1060     lh->reconnect_task = NULL;
1061   }
1062   GNUNET_free (lh);
1063 }
1064
1065
1066 /**
1067  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
1068  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
1069  * afterwards.
1070  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
1071  * and to begin the exchange with the remote peer.
1072  *
1073  * @param request request to accept
1074  * @param result_mode specified how results will be returned,
1075  *        see `enum GNUNET_SET_ResultMode`.
1076  * @param result_cb callback for the results
1077  * @param result_cls closure for @a result_cb
1078  * @return a handle to cancel the operation
1079  */
1080 struct GNUNET_SET_OperationHandle *
1081 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1082                    enum GNUNET_SET_ResultMode result_mode,
1083                    struct GNUNET_SET_Option options[],
1084                    GNUNET_SET_ResultIterator result_cb,
1085                    void *result_cls)
1086 {
1087   struct GNUNET_MQ_Envelope *mqm;
1088   struct GNUNET_SET_OperationHandle *oh;
1089   struct GNUNET_SET_AcceptMessage *msg;
1090
1091   GNUNET_assert (GNUNET_NO == request->accepted);
1092   LOG (GNUNET_ERROR_TYPE_DEBUG,
1093        "Client accepts set operation (%d) with id %u\n",
1094        result_mode,
1095        request->accept_id);
1096   request->accepted = GNUNET_YES;
1097   mqm = GNUNET_MQ_msg (msg,
1098                        GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1099   msg->accept_reject_id = htonl (request->accept_id);
1100   msg->result_mode = htonl (result_mode);
1101   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1102   oh->result_cb = result_cb;
1103   oh->result_cls = result_cls;
1104   oh->conclude_mqm = mqm;
1105   oh->request_id_addr = &msg->request_id;
1106   return oh;
1107 }
1108
1109
1110 /**
1111  * Commit a set to be used with a set operation.
1112  * This function is called once we have fully constructed
1113  * the set that we want to use for the operation.  At this
1114  * time, the P2P protocol can then begin to exchange the
1115  * set information and call the result callback with the
1116  * result information.
1117  *
1118  * @param oh handle to the set operation
1119  * @param set the set to use for the operation
1120  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1121  *         set is invalid (e.g. the set service crashed)
1122  */
1123 int
1124 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1125                    struct GNUNET_SET_Handle *set)
1126 {
1127   if (NULL != oh->set)
1128   {
1129     /* Some other set was already committed for this
1130      * operation, there is a logic bug in the client of this API */
1131     GNUNET_break (0);
1132     return GNUNET_OK;
1133   }
1134   GNUNET_assert (NULL != set);
1135   if (GNUNET_YES == set->invalid)
1136     return GNUNET_SYSERR;
1137   LOG (GNUNET_ERROR_TYPE_DEBUG,
1138        "Client commits to SET\n");
1139   GNUNET_assert (NULL != oh->conclude_mqm);
1140   oh->set = set;
1141   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1142                                set->ops_tail,
1143                                oh);
1144   oh->request_id = GNUNET_MQ_assoc_add (set->mq,
1145                                         oh);
1146   *oh->request_id_addr = htonl (oh->request_id);
1147   GNUNET_MQ_send (set->mq,
1148                   oh->conclude_mqm);
1149   oh->conclude_mqm = NULL;
1150   oh->request_id_addr = NULL;
1151   return GNUNET_OK;
1152 }
1153
1154
1155 /**
1156  * Iterate over all elements in the given set.  Note that this
1157  * operation involves transferring every element of the set from the
1158  * service to the client, and is thus costly.
1159  *
1160  * @param set the set to iterate over
1161  * @param iter the iterator to call for each element
1162  * @param iter_cls closure for @a iter
1163  * @return #GNUNET_YES if the iteration started successfuly,
1164  *         #GNUNET_NO if another iteration is active
1165  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1166  */
1167 int
1168 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1169                     GNUNET_SET_ElementIterator iter,
1170                     void *iter_cls)
1171 {
1172   struct GNUNET_MQ_Envelope *ev;
1173
1174   GNUNET_assert (NULL != iter);
1175   if (GNUNET_YES == set->invalid)
1176     return GNUNET_SYSERR;
1177   if (NULL != set->iterator)
1178     return GNUNET_NO;
1179   LOG (GNUNET_ERROR_TYPE_DEBUG,
1180        "Iterating over set\n");
1181   set->iterator = iter;
1182   set->iterator_cls = iter_cls;
1183   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1184   GNUNET_MQ_send (set->mq, ev);
1185   return GNUNET_YES;
1186 }
1187
1188
1189 void
1190 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1191                       GNUNET_SET_CopyReadyCallback cb,
1192                       void *cls)
1193 {
1194   struct GNUNET_MQ_Envelope *ev;
1195   struct SetCopyRequest *req;
1196
1197   LOG (GNUNET_ERROR_TYPE_DEBUG,
1198        "Creating lazy copy of set\n");
1199   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1200   GNUNET_MQ_send (set->mq, ev);
1201
1202   req = GNUNET_new (struct SetCopyRequest);
1203   req->cb = cb;
1204   req->cls = cls;
1205   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1206                                set->copy_req_tail,
1207                                req);
1208 }
1209
1210
1211 /**
1212  * Create a copy of an element.  The copy
1213  * must be GNUNET_free-d by the caller.
1214  *
1215  * @param element the element to copy
1216  * @return the copied element
1217  */
1218 struct GNUNET_SET_Element *
1219 GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
1220 {
1221   struct GNUNET_SET_Element *copy;
1222
1223   copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
1224   copy->size = element->size;
1225   copy->element_type = element->element_type;
1226   copy->data = &copy[1];
1227   GNUNET_memcpy (&copy[1],
1228                  element->data,
1229                  copy->size);
1230   return copy;
1231 }
1232
1233
1234 /**
1235  * Hash a set element.
1236  *
1237  * @param element the element that should be hashed
1238  * @param[out] ret_hash a pointer to where the hash of @a element
1239  *        should be stored
1240  */
1241 void
1242 GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
1243                          struct GNUNET_HashCode *ret_hash)
1244 {
1245   struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
1246
1247   /* It's not guaranteed that the element data is always after the element header,
1248      so we need to hash the chunks separately. */
1249   GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
1250   GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
1251   GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
1252   GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
1253 }
1254
1255 /* end of set_api.c */