Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_set_service.h"
30 #include "set.h"
31
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
34
35 struct SetCopyRequest
36 {
37   struct SetCopyRequest *next;
38
39   struct SetCopyRequest *prev;
40
41   void *cls;
42
43   GNUNET_SET_CopyReadyCallback cb;
44 };
45
46 /**
47  * Opaque handle to a set.
48  */
49 struct GNUNET_SET_Handle
50 {
51   /**
52    * Message queue for @e client.
53    */
54   struct GNUNET_MQ_Handle *mq;
55
56   /**
57    * Linked list of operations on the set.
58    */
59   struct GNUNET_SET_OperationHandle *ops_head;
60
61   /**
62    * Linked list of operations on the set.
63    */
64   struct GNUNET_SET_OperationHandle *ops_tail;
65
66   /**
67    * Callback for the current iteration over the set,
68    * NULL if no iterator is active.
69    */
70   GNUNET_SET_ElementIterator iterator;
71
72   /**
73    * Closure for @e iterator
74    */
75   void *iterator_cls;
76
77   /**
78    * Should the set be destroyed once all operations are gone?
79    */
80   int destroy_requested;
81
82   /**
83    * Has the set become invalid (e.g. service died)?
84    */
85   int invalid;
86
87   /**
88    * Both client and service count the number of iterators
89    * created so far to match replies with iterators.
90    */
91   uint16_t iteration_id;
92
93   /**
94    * Configuration, needed when creating (lazy) copies.
95    */
96   const struct GNUNET_CONFIGURATION_Handle *cfg;
97
98   /**
99    * Doubly linked list of copy requests.
100    */
101   struct SetCopyRequest *copy_req_head;
102
103   /**
104    * Doubly linked list of copy requests.
105    */
106   struct SetCopyRequest *copy_req_tail;
107 };
108
109
110 /**
111  * Handle for a set operation request from another peer.
112  */
113 struct GNUNET_SET_Request
114 {
115   /**
116    * Id of the request, used to identify the request when
117    * accepting/rejecting it.
118    */
119   uint32_t accept_id;
120
121   /**
122    * Has the request been accepted already?
123    * #GNUNET_YES/#GNUNET_NO
124    */
125   int accepted;
126 };
127
128
129 /**
130  * Handle to an operation.  Only known to the service after committing
131  * the handle with a set.
132  */
133 struct GNUNET_SET_OperationHandle
134 {
135   /**
136    * Function to be called when we have a result,
137    * or an error.
138    */
139   GNUNET_SET_ResultIterator result_cb;
140
141   /**
142    * Closure for @e result_cb.
143    */
144   void *result_cls;
145
146   /**
147    * Local set used for the operation,
148    * NULL if no set has been provided by conclude yet.
149    */
150   struct GNUNET_SET_Handle *set;
151
152   /**
153    * Message sent to the server on calling conclude,
154    * NULL if conclude has been called.
155    */
156   struct GNUNET_MQ_Envelope *conclude_mqm;
157
158   /**
159    * Address of the request if in the conclude message,
160    * used to patch the request id into the message when the set is known.
161    */
162   uint32_t *request_id_addr;
163
164   /**
165    * Handles are kept in a linked list.
166    */
167   struct GNUNET_SET_OperationHandle *prev;
168
169   /**
170    * Handles are kept in a linked list.
171    */
172   struct GNUNET_SET_OperationHandle *next;
173
174   /**
175    * Request ID to identify the operation within the set.
176    */
177   uint32_t request_id;
178 };
179
180
181 /**
182  * Opaque handle to a listen operation.
183  */
184 struct GNUNET_SET_ListenHandle
185 {
186
187   /**
188    * Message queue for the client.
189    */
190   struct GNUNET_MQ_Handle* mq;
191
192   /**
193    * Configuration handle for the listener, stored
194    * here to be able to reconnect transparently on
195    * connection failure.
196    */
197   const struct GNUNET_CONFIGURATION_Handle *cfg;
198
199   /**
200    * Function to call on a new incoming request,
201    * or on error.
202    */
203   GNUNET_SET_ListenCallback listen_cb;
204
205   /**
206    * Closure for @e listen_cb.
207    */
208   void *listen_cls;
209
210   /**
211    * Application ID we listen for.
212    */
213   struct GNUNET_HashCode app_id;
214
215   /**
216    * Time to wait until we try to reconnect on failure.
217    */
218   struct GNUNET_TIME_Relative reconnect_backoff;
219
220   /**
221    * Task for reconnecting when the listener fails.
222    */
223   struct GNUNET_SCHEDULER_Task *reconnect_task;
224
225   /**
226    * Operation we listen for.
227    */
228   enum GNUNET_SET_OperationType operation;
229 };
230
231
232 /* mutual recursion with handle_copy_lazy */
233 static struct GNUNET_SET_Handle *
234 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
235                  enum GNUNET_SET_OperationType op,
236                  const uint32_t *cookie);
237
238
239 /**
240  * Handle element for iteration over the set.  Notifies the
241  * iterator and sends an acknowledgement to the service.
242  *
243  * @param cls the `struct GNUNET_SET_Handle *`
244  * @param msg the message
245  */
246 static void
247 handle_copy_lazy (void *cls,
248                   const struct GNUNET_SET_CopyLazyResponseMessage *msg)
249 {
250   struct GNUNET_SET_Handle *set = cls;
251   struct SetCopyRequest *req;
252   struct GNUNET_SET_Handle *new_set;
253
254   req = set->copy_req_head;
255   if (NULL == req)
256   {
257     /* Service sent us unsolicited lazy copy response */
258     GNUNET_break (0);
259     return;
260   }
261
262   LOG (GNUNET_ERROR_TYPE_DEBUG,
263        "Handling response to lazy copy\n");
264   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
265                                set->copy_req_tail,
266                                req);
267   // We pass none as operation here, since it doesn't matter when
268   // cloning.
269   new_set = create_internal (set->cfg,
270                              GNUNET_SET_OPERATION_NONE,
271                              &msg->cookie);
272   req->cb (req->cls, new_set);
273   GNUNET_free (req);
274 }
275
276
277 /**
278  * Check that the given @a msg is well-formed.
279  *
280  * @param cls closure
281  * @param msg message to check
282  * @return #GNUNET_OK if message is well-formed
283  */
284 static int
285 check_iter_element (void *cls,
286                     const struct GNUNET_SET_IterResponseMessage *msg)
287 {
288   /* minimum size was already checked, everything else is OK! */
289   return GNUNET_OK;
290 }
291
292
293 /**
294  * Handle element for iteration over the set.  Notifies the
295  * iterator and sends an acknowledgement to the service.
296  *
297  * @param cls the `struct GNUNET_SET_Handle *`
298  * @param mh the message
299  */
300 static void
301 handle_iter_element (void *cls,
302                      const struct GNUNET_SET_IterResponseMessage *msg)
303 {
304   struct GNUNET_SET_Handle *set = cls;
305   GNUNET_SET_ElementIterator iter = set->iterator;
306   struct GNUNET_SET_Element element;
307   struct GNUNET_SET_IterAckMessage *ack_msg;
308   struct GNUNET_MQ_Envelope *ev;
309   uint16_t msize;
310
311   msize = ntohs (msg->header.size);
312   if (set->iteration_id != ntohs (msg->iteration_id))
313   {
314     /* element from a previous iteration, skip! */
315     iter = NULL;
316   }
317   if (NULL != iter)
318   {
319     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
320     element.element_type = ntohs (msg->element_type);
321     element.data = &msg[1];
322     iter (set->iterator_cls,
323           &element);
324   }
325   ev = GNUNET_MQ_msg (ack_msg,
326                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
327   ack_msg->send_more = htonl ((NULL != iter));
328   GNUNET_MQ_send (set->mq, ev);
329 }
330
331
332 /**
333  * Handle message signalling conclusion of iteration over the set.
334  * Notifies the iterator that we are done.
335  *
336  * @param cls the set
337  * @param mh the message
338  */
339 static void
340 handle_iter_done (void *cls,
341                   const struct GNUNET_MessageHeader *mh)
342 {
343   struct GNUNET_SET_Handle *set = cls;
344   GNUNET_SET_ElementIterator iter = set->iterator;
345
346   if (NULL == iter)
347     return;
348   set->iterator = NULL;
349   set->iteration_id++;
350   iter (set->iterator_cls,
351         NULL);
352
353   if (GNUNET_YES == set->destroy_requested)
354     GNUNET_SET_destroy (set);
355 }
356
357
358 /**
359  * Check that the given @a msg is well-formed.
360  *
361  * @param cls closure
362  * @param msg message to check
363  * @return #GNUNET_OK if message is well-formed
364  */
365 static int
366 check_result (void *cls,
367               const struct GNUNET_SET_ResultMessage *msg)
368 {
369   /* minimum size was already checked, everything else is OK! */
370   return GNUNET_OK;
371 }
372
373
374 /**
375  * Handle result message for a set operation.
376  *
377  * @param cls the set
378  * @param mh the message
379  */
380 static void
381 handle_result (void *cls,
382                const struct GNUNET_SET_ResultMessage *msg)
383 {
384   struct GNUNET_SET_Handle *set = cls;
385   struct GNUNET_SET_OperationHandle *oh;
386   struct GNUNET_SET_Element e;
387   enum GNUNET_SET_Status result_status;
388   int destroy_set;
389
390   GNUNET_assert (NULL != set->mq);
391   result_status = ntohs (msg->result_status);
392   LOG (GNUNET_ERROR_TYPE_DEBUG,
393        "Got result message with status %d\n",
394        result_status);
395
396   oh = GNUNET_MQ_assoc_get (set->mq,
397                             ntohl (msg->request_id));
398   if (NULL == oh)
399   {
400     /* 'oh' can be NULL if we canceled the operation, but the service
401        did not get the cancel message yet. */
402     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403                 "Ignoring result from canceled operation\n");
404     return;
405   }
406
407   switch (result_status)
408   {
409     case GNUNET_SET_STATUS_OK:
410     case GNUNET_SET_STATUS_ADD_LOCAL:
411     case GNUNET_SET_STATUS_ADD_REMOTE:
412       goto do_element;
413     case GNUNET_SET_STATUS_FAILURE:
414     case GNUNET_SET_STATUS_DONE:
415       goto do_final;
416     case GNUNET_SET_STATUS_HALF_DONE:
417       /* not used anymore */
418       GNUNET_assert (0);
419   }
420
421 do_final:
422   LOG (GNUNET_ERROR_TYPE_DEBUG,
423        "Treating result as final status\n");
424   GNUNET_MQ_assoc_remove (set->mq,
425                           ntohl (msg->request_id));
426   GNUNET_CONTAINER_DLL_remove (set->ops_head,
427                                set->ops_tail,
428                                oh);
429   /* Need to do this calculation _before_ the result callback,
430      as IF the application still has a valid set handle, it
431      may trigger destruction of the set during the callback. */
432   destroy_set = (GNUNET_YES == set->destroy_requested) &&
433                 (NULL == set->ops_head);
434   if (NULL != oh->result_cb)
435   {
436     oh->result_cb (oh->result_cls,
437                    NULL,
438                    GNUNET_ntohll (msg->current_size),
439                    result_status);
440   }
441   else
442   {
443     LOG (GNUNET_ERROR_TYPE_DEBUG,
444          "No callback for final status\n");
445   }
446   if (destroy_set)
447     GNUNET_SET_destroy (set);
448   GNUNET_free (oh);
449   return;
450
451 do_element:
452   LOG (GNUNET_ERROR_TYPE_DEBUG,
453        "Treating result as element\n");
454   e.data = &msg[1];
455   e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
456   e.element_type = ntohs (msg->element_type);
457   if (NULL != oh->result_cb)
458     oh->result_cb (oh->result_cls,
459                    &e,
460                    GNUNET_ntohll (msg->current_size),
461                    result_status);
462 }
463
464
465 /**
466  * Destroy the given set operation.
467  *
468  * @param oh set operation to destroy
469  */
470 static void
471 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
472 {
473   struct GNUNET_SET_Handle *set = oh->set;
474   struct GNUNET_SET_OperationHandle *h_assoc;
475
476   if (NULL != oh->conclude_mqm)
477     GNUNET_MQ_discard (oh->conclude_mqm);
478   /* is the operation already commited? */
479   if (NULL != set)
480   {
481     GNUNET_CONTAINER_DLL_remove (set->ops_head,
482                                  set->ops_tail,
483                                  oh);
484     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
485                                       oh->request_id);
486     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
487   }
488   GNUNET_free (oh);
489 }
490
491
492 /**
493  * Cancel the given set operation.  We need to send an explicit cancel
494  * message, as all operations one one set communicate using one
495  * handle.
496  *
497  * @param oh set operation to cancel
498  */
499 void
500 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
501 {
502   struct GNUNET_SET_Handle *set = oh->set;
503   struct GNUNET_SET_CancelMessage *m;
504   struct GNUNET_MQ_Envelope *mqm;
505
506   if (NULL != set)
507   {
508     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
509     m->request_id = htonl (oh->request_id);
510     GNUNET_MQ_send (set->mq, mqm);
511   }
512   set_operation_destroy (oh);
513   if ( (NULL != set) &&
514        (GNUNET_YES == set->destroy_requested) &&
515        (NULL == set->ops_head) )
516   {
517     LOG (GNUNET_ERROR_TYPE_DEBUG,
518          "Destroying set after operation cancel\n");
519     GNUNET_SET_destroy (set);
520   }
521 }
522
523
524 /**
525  * We encountered an error communicating with the set service while
526  * performing a set operation. Report to the application.
527  *
528  * @param cls the `struct GNUNET_SET_Handle`
529  * @param error error code
530  */
531 static void
532 handle_client_set_error (void *cls,
533                          enum GNUNET_MQ_Error error)
534 {
535   struct GNUNET_SET_Handle *set = cls;
536   GNUNET_SET_ElementIterator iter = set->iterator;
537
538   LOG (GNUNET_ERROR_TYPE_ERROR,
539        "Handling client set error %d\n",
540        error);
541   while (NULL != set->ops_head)
542   {
543     if (NULL != set->ops_head->result_cb)
544       set->ops_head->result_cb (set->ops_head->result_cls,
545                                 NULL,
546                                 0,
547                                 GNUNET_SET_STATUS_FAILURE);
548     set_operation_destroy (set->ops_head);
549   }
550   set->iterator = NULL;
551   set->iteration_id++;
552   set->invalid = GNUNET_YES;
553   if (NULL != iter)
554     iter (set->iterator_cls,
555           NULL);
556 }
557
558
559 static struct GNUNET_SET_Handle *
560 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
561                  enum GNUNET_SET_OperationType op,
562                  const uint32_t *cookie)
563 {
564   struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
565   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
566     GNUNET_MQ_hd_var_size (result,
567                            GNUNET_MESSAGE_TYPE_SET_RESULT,
568                            struct GNUNET_SET_ResultMessage,
569                            set),
570     GNUNET_MQ_hd_var_size (iter_element,
571                            GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
572                            struct GNUNET_SET_IterResponseMessage,
573                            set),
574     GNUNET_MQ_hd_fixed_size (iter_done,
575                              GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
576                              struct GNUNET_MessageHeader,
577                              set),
578     GNUNET_MQ_hd_fixed_size (copy_lazy,
579                              GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
580                              struct GNUNET_SET_CopyLazyResponseMessage,
581                              set),
582     GNUNET_MQ_handler_end ()
583   };
584   struct GNUNET_MQ_Envelope *mqm;
585   struct GNUNET_SET_CreateMessage *create_msg;
586   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
587
588   set->cfg = cfg;
589   set->mq = GNUNET_CLIENT_connect (cfg,
590                                    "set",
591                                    mq_handlers,
592                                    &handle_client_set_error,
593                                    set);
594   if (NULL == set->mq)
595   {
596     GNUNET_free (set);
597     return NULL;
598   }
599   if (NULL == cookie)
600   {
601     LOG (GNUNET_ERROR_TYPE_DEBUG,
602          "Creating new set (operation %u)\n",
603          op);
604     mqm = GNUNET_MQ_msg (create_msg,
605                          GNUNET_MESSAGE_TYPE_SET_CREATE);
606     create_msg->operation = htonl (op);
607   }
608   else
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG,
611          "Creating new set (lazy copy)\n",
612          op);
613     mqm = GNUNET_MQ_msg (copy_msg,
614                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
615     copy_msg->cookie = *cookie;
616   }
617   GNUNET_MQ_send (set->mq, mqm);
618   return set;
619 }
620
621
622 /**
623  * Create an empty set, supporting the specified operation.
624  *
625  * @param cfg configuration to use for connecting to the
626  *        set service
627  * @param op operation supported by the set
628  *        Note that the operation has to be specified
629  *        beforehand, as certain set operations need to maintain
630  *        data structures spefific to the operation
631  * @return a handle to the set
632  */
633 struct GNUNET_SET_Handle *
634 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
635                    enum GNUNET_SET_OperationType op)
636 {
637   return create_internal (cfg, op, NULL);
638 }
639
640
641 /**
642  * Add an element to the given set.  After the element has been added
643  * (in the sense of being transmitted to the set service), @a cont
644  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
645  * queued.
646  *
647  * @param set set to add element to
648  * @param element element to add to the set
649  * @param cont continuation called after the element has been added
650  * @param cont_cls closure for @a cont
651  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
652  *         set is invalid (e.g. the set service crashed)
653  */
654 int
655 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
656                         const struct GNUNET_SET_Element *element,
657                         GNUNET_SET_Continuation cont,
658                         void *cont_cls)
659 {
660   struct GNUNET_MQ_Envelope *mqm;
661   struct GNUNET_SET_ElementMessage *msg;
662
663   LOG (GNUNET_ERROR_TYPE_INFO, "adding element of type %u\n", (unsigned) element->element_type);
664
665   if (GNUNET_YES == set->invalid)
666   {
667     if (NULL != cont)
668       cont (cont_cls);
669     return GNUNET_SYSERR;
670   }
671   mqm = GNUNET_MQ_msg_extra (msg,
672                              element->size,
673                              GNUNET_MESSAGE_TYPE_SET_ADD);
674   msg->element_type = htons (element->element_type);
675   GNUNET_memcpy (&msg[1],
676                  element->data,
677                  element->size);
678   GNUNET_MQ_notify_sent (mqm,
679                          cont, cont_cls);
680   GNUNET_MQ_send (set->mq, mqm);
681   return GNUNET_OK;
682 }
683
684
685 /**
686  * Remove an element to the given set.  After the element has been
687  * removed (in the sense of the request being transmitted to the set
688  * service), @a cont will be called.  Multiple calls to
689  * GNUNET_SET_remove_element() can be queued
690  *
691  * @param set set to remove element from
692  * @param element element to remove from the set
693  * @param cont continuation called after the element has been removed
694  * @param cont_cls closure for @a cont
695  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
696  *         set is invalid (e.g. the set service crashed)
697  */
698 int
699 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
700                            const struct GNUNET_SET_Element *element,
701                            GNUNET_SET_Continuation cont,
702                            void *cont_cls)
703 {
704   struct GNUNET_MQ_Envelope *mqm;
705   struct GNUNET_SET_ElementMessage *msg;
706
707   if (GNUNET_YES == set->invalid)
708   {
709     if (NULL != cont)
710       cont (cont_cls);
711     return GNUNET_SYSERR;
712   }
713   mqm = GNUNET_MQ_msg_extra (msg,
714                              element->size,
715                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
716   msg->element_type = htons (element->element_type);
717   GNUNET_memcpy (&msg[1],
718                  element->data,
719                  element->size);
720   GNUNET_MQ_notify_sent (mqm,
721                          cont, cont_cls);
722   GNUNET_MQ_send (set->mq, mqm);
723   return GNUNET_OK;
724 }
725
726
727 /**
728  * Destroy the set handle if no operations are left, mark the set
729  * for destruction otherwise.
730  *
731  * @param set set handle to destroy
732  */
733 void
734 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
735 {
736   /* destroying set while iterator is active is currently
737      not supported; we should expand the API to allow
738      clients to explicitly cancel the iteration! */
739   if ( (NULL != set->ops_head) || (NULL != set->iterator) )
740   {
741     LOG (GNUNET_ERROR_TYPE_DEBUG,
742          "Set operations are pending, delaying set destruction\n");
743     set->destroy_requested = GNUNET_YES;
744     return;
745   }
746   LOG (GNUNET_ERROR_TYPE_DEBUG,
747        "Really destroying set\n");
748   if (NULL != set->mq)
749   {
750     GNUNET_MQ_destroy (set->mq);
751     set->mq = NULL;
752   }
753   GNUNET_free (set);
754 }
755
756
757 /**
758  * Prepare a set operation to be evaluated with another peer.
759  * The evaluation will not start until the client provides
760  * a local set with #GNUNET_SET_commit().
761  *
762  * @param other_peer peer with the other set
763  * @param app_id hash for the application using the set
764  * @param context_msg additional information for the request
765  * @param result_mode specified how results will be returned,
766  *        see `enum GNUNET_SET_ResultMode`.
767  * @param result_cb called on error or success
768  * @param result_cls closure for @e result_cb
769  * @return a handle to cancel the operation
770  */
771 struct GNUNET_SET_OperationHandle *
772 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
773                     const struct GNUNET_HashCode *app_id,
774                     const struct GNUNET_MessageHeader *context_msg,
775                     enum GNUNET_SET_ResultMode result_mode,
776                     struct GNUNET_SET_Option options[],
777                     GNUNET_SET_ResultIterator result_cb,
778                     void *result_cls)
779 {
780   struct GNUNET_MQ_Envelope *mqm;
781   struct GNUNET_SET_OperationHandle *oh;
782   struct GNUNET_SET_EvaluateMessage *msg;
783   struct GNUNET_SET_Option *opt;
784
785   LOG (GNUNET_ERROR_TYPE_DEBUG,
786        "Client prepares set operation (%d)\n",
787        result_mode);
788   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
789   oh->result_cb = result_cb;
790   oh->result_cls = result_cls;
791   mqm = GNUNET_MQ_msg_nested_mh (msg,
792                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
793                                  context_msg);
794   msg->app_id = *app_id;
795   msg->result_mode = htonl (result_mode);
796   msg->target_peer = *other_peer;
797   for (opt = options; opt->type != 0; opt++)
798   {
799     switch (opt->type)
800     {
801       case GNUNET_SET_OPTION_BYZANTINE:
802         msg->byzantine = GNUNET_YES;
803         msg->byzantine_lower_bound = opt->v.num;
804         break;
805       case GNUNET_SET_OPTION_FORCE_FULL:
806         msg->force_full = GNUNET_YES;
807         break;
808       case GNUNET_SET_OPTION_FORCE_DELTA:
809         msg->force_delta = GNUNET_YES;
810         break;
811       default:
812         LOG (GNUNET_ERROR_TYPE_ERROR, 
813              "Option with type %d not recognized\n", (int) opt->type);
814     }
815   }
816   oh->conclude_mqm = mqm;
817   oh->request_id_addr = &msg->request_id;
818
819   return oh;
820 }
821
822
823 /**
824  * Connect to the set service in order to listen for requests.
825  *
826  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
827  */
828 static void
829 listen_connect (void *cls);
830
831
832 /**
833  * Check validity of request message for a listen operation
834  *
835  * @param cls the listen handle
836  * @param msg the message
837  * @return #GNUNET_OK if the message is well-formed
838  */
839 static int
840 check_request (void *cls,
841                const struct GNUNET_SET_RequestMessage *msg)
842 {
843   const struct GNUNET_MessageHeader *context_msg;
844
845   if (ntohs (msg->header.size) == sizeof (*msg))
846     return GNUNET_OK; /* no context message is OK */
847   context_msg = GNUNET_MQ_extract_nested_mh (msg);
848   if (NULL == context_msg)
849   {
850     /* malformed context message is NOT ok */
851     GNUNET_break_op (0);
852     return GNUNET_SYSERR;
853   }
854   return GNUNET_OK;
855 }
856
857
858 /**
859  * Handle request message for a listen operation
860  *
861  * @param cls the listen handle
862  * @param msg the message
863  */
864 static void
865 handle_request (void *cls,
866                 const struct GNUNET_SET_RequestMessage *msg)
867 {
868   struct GNUNET_SET_ListenHandle *lh = cls;
869   struct GNUNET_SET_Request req;
870   const struct GNUNET_MessageHeader *context_msg;
871   struct GNUNET_MQ_Envelope *mqm;
872   struct GNUNET_SET_RejectMessage *rmsg;
873
874   LOG (GNUNET_ERROR_TYPE_DEBUG,
875        "Processing incoming operation request\n");
876   /* we got another valid request => reset the backoff */
877   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
878   req.accept_id = ntohl (msg->accept_id);
879   req.accepted = GNUNET_NO;
880   context_msg = GNUNET_MQ_extract_nested_mh (msg);
881   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
882   lh->listen_cb (lh->listen_cls,
883                  &msg->peer_id,
884                  context_msg,
885                  &req);
886   if (GNUNET_YES == req.accepted)
887     return; /* the accept-case is handled in #GNUNET_SET_accept() */
888   LOG (GNUNET_ERROR_TYPE_DEBUG,
889        "Rejecting request\n");
890   mqm = GNUNET_MQ_msg (rmsg,
891                        GNUNET_MESSAGE_TYPE_SET_REJECT);
892   rmsg->accept_reject_id = msg->accept_id;
893   GNUNET_MQ_send (lh->mq, mqm);
894 }
895
896
897 /**
898  * Our connection with the set service encountered an error,
899  * re-initialize with exponential back-off.
900  *
901  * @param cls the `struct GNUNET_SET_ListenHandle *`
902  * @param error reason for the disconnect
903  */
904 static void
905 handle_client_listener_error (void *cls,
906                               enum GNUNET_MQ_Error error)
907 {
908   struct GNUNET_SET_ListenHandle *lh = cls;
909
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911        "Listener broke down (%d), re-connecting\n",
912        (int) error);
913   GNUNET_MQ_destroy (lh->mq);
914   lh->mq = NULL;
915   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
916                                                      &listen_connect,
917                                                      lh);
918   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
919 }
920
921
922 /**
923  * Connect to the set service in order to listen for requests.
924  *
925  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
926  */
927 static void
928 listen_connect (void *cls)
929 {
930   struct GNUNET_SET_ListenHandle *lh = cls;
931   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
932     GNUNET_MQ_hd_var_size (request,
933                            GNUNET_MESSAGE_TYPE_SET_REQUEST,
934                            struct GNUNET_SET_RequestMessage,
935                            lh),
936     GNUNET_MQ_handler_end ()
937   };
938   struct GNUNET_MQ_Envelope *mqm;
939   struct GNUNET_SET_ListenMessage *msg;
940
941   lh->reconnect_task = NULL;
942   GNUNET_assert (NULL == lh->mq);
943   lh->mq = GNUNET_CLIENT_connect (lh->cfg,
944                                   "set",
945                                   mq_handlers,
946                                   &handle_client_listener_error,
947                                   lh);
948   if (NULL == lh->mq)
949     return;
950   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
951   msg->operation = htonl (lh->operation);
952   msg->app_id = lh->app_id;
953   GNUNET_MQ_send (lh->mq,
954                   mqm);
955 }
956
957
958 /**
959  * Wait for set operation requests for the given application id
960  *
961  * @param cfg configuration to use for connecting to
962  *            the set service, needs to be valid for the lifetime of the listen handle
963  * @param operation operation we want to listen for
964  * @param app_id id of the application that handles set operation requests
965  * @param listen_cb called for each incoming request matching the operation
966  *                  and application id
967  * @param listen_cls handle for @a listen_cb
968  * @return a handle that can be used to cancel the listen operation
969  */
970 struct GNUNET_SET_ListenHandle *
971 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
972                    enum GNUNET_SET_OperationType operation,
973                    const struct GNUNET_HashCode *app_id,
974                    GNUNET_SET_ListenCallback listen_cb,
975                    void *listen_cls)
976 {
977   struct GNUNET_SET_ListenHandle *lh;
978
979   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
980   lh->listen_cb = listen_cb;
981   lh->listen_cls = listen_cls;
982   lh->cfg = cfg;
983   lh->operation = operation;
984   lh->app_id = *app_id;
985   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
986   listen_connect (lh);
987   if (NULL == lh->mq)
988   {
989     GNUNET_free (lh);
990     return NULL;
991   }
992   return lh;
993 }
994
995
996 /**
997  * Cancel the given listen operation.
998  *
999  * @param lh handle for the listen operation
1000  */
1001 void
1002 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
1003 {
1004   LOG (GNUNET_ERROR_TYPE_DEBUG,
1005        "Canceling listener\n");
1006   if (NULL != lh->mq)
1007   {
1008     GNUNET_MQ_destroy (lh->mq);
1009     lh->mq = NULL;
1010   }
1011   if (NULL != lh->reconnect_task)
1012   {
1013     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
1014     lh->reconnect_task = NULL;
1015   }
1016   GNUNET_free (lh);
1017 }
1018
1019
1020 /**
1021  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
1022  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
1023  * afterwards.
1024  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
1025  * and to begin the exchange with the remote peer.
1026  *
1027  * @param request request to accept
1028  * @param result_mode specified how results will be returned,
1029  *        see `enum GNUNET_SET_ResultMode`.
1030  * @param result_cb callback for the results
1031  * @param result_cls closure for @a result_cb
1032  * @return a handle to cancel the operation
1033  */
1034 struct GNUNET_SET_OperationHandle *
1035 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1036                    enum GNUNET_SET_ResultMode result_mode,
1037                    struct GNUNET_SET_Option options[],
1038                    GNUNET_SET_ResultIterator result_cb,
1039                    void *result_cls)
1040 {
1041   struct GNUNET_MQ_Envelope *mqm;
1042   struct GNUNET_SET_OperationHandle *oh;
1043   struct GNUNET_SET_AcceptMessage *msg;
1044
1045   GNUNET_assert (GNUNET_NO == request->accepted);
1046   LOG (GNUNET_ERROR_TYPE_DEBUG,
1047        "Client accepts set operation (%d)\n",
1048        result_mode);
1049   request->accepted = GNUNET_YES;
1050   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1051   msg->accept_reject_id = htonl (request->accept_id);
1052   msg->result_mode = htonl (result_mode);
1053   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1054   oh->result_cb = result_cb;
1055   oh->result_cls = result_cls;
1056   oh->conclude_mqm = mqm;
1057   oh->request_id_addr = &msg->request_id;
1058   return oh;
1059 }
1060
1061
1062 /**
1063  * Commit a set to be used with a set operation.
1064  * This function is called once we have fully constructed
1065  * the set that we want to use for the operation.  At this
1066  * time, the P2P protocol can then begin to exchange the
1067  * set information and call the result callback with the
1068  * result information.
1069  *
1070  * @param oh handle to the set operation
1071  * @param set the set to use for the operation
1072  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1073  *         set is invalid (e.g. the set service crashed)
1074  */
1075 int
1076 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1077                    struct GNUNET_SET_Handle *set)
1078 {
1079   if (NULL != oh->set)
1080   {
1081     /* Some other set was already commited for this
1082      * operation, there is a logic bug in the client of this API */
1083     GNUNET_break (0);
1084     return GNUNET_OK;
1085   }
1086   if (GNUNET_YES == set->invalid)
1087     return GNUNET_SYSERR;
1088   LOG (GNUNET_ERROR_TYPE_DEBUG,
1089        "Client commits to SET\n");
1090   GNUNET_assert (NULL != oh->conclude_mqm);
1091   oh->set = set;
1092   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1093                                set->ops_tail,
1094                                oh);
1095   oh->request_id = GNUNET_MQ_assoc_add (set->mq,
1096                                         oh);
1097   *oh->request_id_addr = htonl (oh->request_id);
1098   GNUNET_MQ_send (set->mq,
1099                   oh->conclude_mqm);
1100   oh->conclude_mqm = NULL;
1101   oh->request_id_addr = NULL;
1102   return GNUNET_OK;
1103 }
1104
1105
1106 /**
1107  * Iterate over all elements in the given set.  Note that this
1108  * operation involves transferring every element of the set from the
1109  * service to the client, and is thus costly.
1110  *
1111  * @param set the set to iterate over
1112  * @param iter the iterator to call for each element
1113  * @param iter_cls closure for @a iter
1114  * @return #GNUNET_YES if the iteration started successfuly,
1115  *         #GNUNET_NO if another iteration is active
1116  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1117  */
1118 int
1119 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1120                     GNUNET_SET_ElementIterator iter,
1121                     void *iter_cls)
1122 {
1123   struct GNUNET_MQ_Envelope *ev;
1124
1125   GNUNET_assert (NULL != iter);
1126   if (GNUNET_YES == set->invalid)
1127     return GNUNET_SYSERR;
1128   if (NULL != set->iterator)
1129     return GNUNET_NO;
1130   LOG (GNUNET_ERROR_TYPE_DEBUG,
1131        "Iterating over set\n");
1132   set->iterator = iter;
1133   set->iterator_cls = iter_cls;
1134   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1135   GNUNET_MQ_send (set->mq, ev);
1136   return GNUNET_YES;
1137 }
1138
1139
1140 void
1141 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1142                       GNUNET_SET_CopyReadyCallback cb,
1143                       void *cls)
1144 {
1145   struct GNUNET_MQ_Envelope *ev;
1146   struct SetCopyRequest *req;
1147
1148   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1149   GNUNET_MQ_send (set->mq, ev);
1150
1151   req = GNUNET_new (struct SetCopyRequest);
1152   req->cb = cb;
1153   req->cls = cls;
1154   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1155                                set->copy_req_tail,
1156                                req);
1157 }
1158
1159
1160 /**
1161  * Create a copy of an element.  The copy
1162  * must be GNUNET_free-d by the caller.
1163  *
1164  * @param element the element to copy
1165  * @return the copied element
1166  */
1167 struct GNUNET_SET_Element *
1168 GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
1169 {
1170   struct GNUNET_SET_Element *copy;
1171
1172   copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
1173   copy->size = element->size;
1174   copy->element_type = element->element_type;
1175   copy->data = &copy[1];
1176   GNUNET_memcpy (&copy[1],
1177                  element->data,
1178                  copy->size);
1179   return copy;
1180 }
1181
1182
1183 /**
1184  * Hash a set element.
1185  *
1186  * @param element the element that should be hashed
1187  * @param[out] ret_hash a pointer to where the hash of @a element
1188  *        should be stored
1189  */
1190 void
1191 GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
1192                          struct GNUNET_HashCode *ret_hash)
1193 {
1194   struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
1195
1196   /* It's not guaranteed that the element data is always after the element header,
1197      so we need to hash the chunks separately. */
1198   GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
1199   GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
1200   GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
1201   GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
1202 }
1203
1204 /* end of set_api.c */