343b5f8812887dca59d95d8ab95dac0873e1fab1
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_set_service.h"
31 #include "set.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35
36 struct SetCopyRequest
37 {
38   struct SetCopyRequest *next;
39
40   struct SetCopyRequest *prev;
41
42   void *cls;
43
44   GNUNET_SET_CopyReadyCallback cb;
45 };
46
47 /**
48  * Opaque handle to a set.
49  */
50 struct GNUNET_SET_Handle
51 {
52   /**
53    * Client connected to the set service.
54    */
55   struct GNUNET_CLIENT_Connection *client;
56
57   /**
58    * Message queue for @e client.
59    */
60   struct GNUNET_MQ_Handle *mq;
61
62   /**
63    * Linked list of operations on the set.
64    */
65   struct GNUNET_SET_OperationHandle *ops_head;
66
67   /**
68    * Linked list of operations on the set.
69    */
70   struct GNUNET_SET_OperationHandle *ops_tail;
71
72   /**
73    * Callback for the current iteration over the set,
74    * NULL if no iterator is active.
75    */
76   GNUNET_SET_ElementIterator iterator;
77
78   /**
79    * Closure for @e iterator
80    */
81   void *iterator_cls;
82
83   /**
84    * Should the set be destroyed once all operations are gone?
85    */
86   int destroy_requested;
87
88   /**
89    * Has the set become invalid (e.g. service died)?
90    */
91   int invalid;
92
93   /**
94    * Both client and service count the number of iterators
95    * created so far to match replies with iterators.
96    */
97   uint16_t iteration_id;
98
99   /**
100    * Configuration, needed when creating (lazy) copies.
101    */
102   const struct GNUNET_CONFIGURATION_Handle *cfg;
103
104   /**
105    * Doubly linked list of copy requests.
106    */
107   struct SetCopyRequest *copy_req_head;
108
109   /**
110    * Doubly linked list of copy requests.
111    */
112   struct SetCopyRequest *copy_req_tail;
113 };
114
115
116 /**
117  * Handle for a set operation request from another peer.
118  */
119 struct GNUNET_SET_Request
120 {
121   /**
122    * Id of the request, used to identify the request when
123    * accepting/rejecting it.
124    */
125   uint32_t accept_id;
126
127   /**
128    * Has the request been accepted already?
129    * #GNUNET_YES/#GNUNET_NO
130    */
131   int accepted;
132 };
133
134
135 /**
136  * Handle to an operation.  Only known to the service after committing
137  * the handle with a set.
138  */
139 struct GNUNET_SET_OperationHandle
140 {
141   /**
142    * Function to be called when we have a result,
143    * or an error.
144    */
145   GNUNET_SET_ResultIterator result_cb;
146
147   /**
148    * Closure for @e result_cb.
149    */
150   void *result_cls;
151
152   /**
153    * Local set used for the operation,
154    * NULL if no set has been provided by conclude yet.
155    */
156   struct GNUNET_SET_Handle *set;
157
158   /**
159    * Message sent to the server on calling conclude,
160    * NULL if conclude has been called.
161    */
162   struct GNUNET_MQ_Envelope *conclude_mqm;
163
164   /**
165    * Address of the request if in the conclude message,
166    * used to patch the request id into the message when the set is known.
167    */
168   uint32_t *request_id_addr;
169
170   /**
171    * Handles are kept in a linked list.
172    */
173   struct GNUNET_SET_OperationHandle *prev;
174
175   /**
176    * Handles are kept in a linked list.
177    */
178   struct GNUNET_SET_OperationHandle *next;
179
180   /**
181    * Request ID to identify the operation within the set.
182    */
183   uint32_t request_id;
184 };
185
186
187 /**
188  * Opaque handle to a listen operation.
189  */
190 struct GNUNET_SET_ListenHandle
191 {
192   /**
193    * Connection to the service.
194    */
195   struct GNUNET_CLIENT_Connection *client;
196
197   /**
198    * Message queue for the client.
199    */
200   struct GNUNET_MQ_Handle* mq;
201
202   /**
203    * Configuration handle for the listener, stored
204    * here to be able to reconnect transparently on
205    * connection failure.
206    */
207   const struct GNUNET_CONFIGURATION_Handle *cfg;
208
209   /**
210    * Function to call on a new incoming request,
211    * or on error.
212    */
213   GNUNET_SET_ListenCallback listen_cb;
214
215   /**
216    * Closure for @e listen_cb.
217    */
218   void *listen_cls;
219
220   /**
221    * Application ID we listen for.
222    */
223   struct GNUNET_HashCode app_id;
224
225   /**
226    * Time to wait until we try to reconnect on failure.
227    */
228   struct GNUNET_TIME_Relative reconnect_backoff;
229
230   /**
231    * Task for reconnecting when the listener fails.
232    */
233   struct GNUNET_SCHEDULER_Task *reconnect_task;
234
235   /**
236    * Operation we listen for.
237    */
238   enum GNUNET_SET_OperationType operation;
239 };
240
241
242 /* mutual recursion with handle_copy_lazy */
243 static struct GNUNET_SET_Handle *
244 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
245                  enum GNUNET_SET_OperationType op,
246                  const uint32_t *cookie);
247
248
249 /**
250  * Handle element for iteration over the set.  Notifies the
251  * iterator and sends an acknowledgement to the service.
252  *
253  * @param cls the `struct GNUNET_SET_Handle *`
254  * @param msg the message
255  */
256 static void
257 handle_copy_lazy (void *cls,
258                   const struct GNUNET_SET_CopyLazyResponseMessage *msg)
259 {
260   struct GNUNET_SET_Handle *set = cls;
261   struct SetCopyRequest *req;
262   struct GNUNET_SET_Handle *new_set;
263
264   req = set->copy_req_head;
265   if (NULL == req)
266   {
267     /* Service sent us unsolicited lazy copy response */
268     GNUNET_break (0);
269     return;
270   }
271
272   LOG (GNUNET_ERROR_TYPE_DEBUG,
273        "Handling response to lazy copy\n");
274   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
275                                set->copy_req_tail,
276                                req);
277   // We pass none as operation here, since it doesn't matter when
278   // cloning.
279   new_set = create_internal (set->cfg,
280                              GNUNET_SET_OPERATION_NONE,
281                              &msg->cookie);
282   req->cb (req->cls, new_set);
283   GNUNET_free (req);
284 }
285
286
287 /**
288  * Check that the given @a msg is well-formed.
289  *
290  * @param cls closure
291  * @param msg message to check
292  * @return #GNUNET_OK if message is well-formed
293  */
294 static int
295 check_iter_element (void *cls,
296                     const struct GNUNET_SET_IterResponseMessage *msg)
297 {
298   /* minimum size was already checked, everything else is OK! */
299   return GNUNET_OK;
300 }
301  
302
303 /**
304  * Handle element for iteration over the set.  Notifies the
305  * iterator and sends an acknowledgement to the service.
306  *
307  * @param cls the `struct GNUNET_SET_Handle *`
308  * @param mh the message
309  */
310  static void
311  handle_iter_element (void *cls,
312                       const struct GNUNET_SET_IterResponseMessage *msg)
313 {
314   struct GNUNET_SET_Handle *set = cls;
315   GNUNET_SET_ElementIterator iter = set->iterator;
316   struct GNUNET_SET_Element element;  
317   struct GNUNET_SET_IterAckMessage *ack_msg;
318   struct GNUNET_MQ_Envelope *ev;
319   uint16_t msize;
320
321   msize = ntohs (msg->header.size);
322   if (set->iteration_id != ntohs (msg->iteration_id))
323   {
324     /* element from a previous iteration, skip! */
325     iter = NULL;
326   }
327   if (NULL != iter)
328   {
329     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
330     element.element_type = ntohs (msg->element_type);
331     element.data = &msg[1];
332     iter (set->iterator_cls,
333           &element);
334   }
335   ev = GNUNET_MQ_msg (ack_msg,
336                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
337   ack_msg->send_more = htonl ((NULL != iter));
338   GNUNET_MQ_send (set->mq, ev);
339 }
340
341
342 /**
343  * Handle message signalling conclusion of iteration over the set.
344  * Notifies the iterator that we are done.
345  *
346  * @param cls the set
347  * @param mh the message
348  */
349 static void
350 handle_iter_done (void *cls,
351                   const struct GNUNET_MessageHeader *mh)
352 {
353   struct GNUNET_SET_Handle *set = cls;
354   GNUNET_SET_ElementIterator iter = set->iterator;
355
356   if (NULL == iter)
357     return;
358   set->iterator = NULL;
359   set->iteration_id++;
360   iter (set->iterator_cls,
361         NULL);
362 }
363
364
365 /**
366  * Check that the given @a msg is well-formed.
367  *
368  * @param cls closure
369  * @param msg message to check
370  * @return #GNUNET_OK if message is well-formed
371  */
372 static int
373 check_result (void *cls,
374               const struct GNUNET_SET_ResultMessage *msg)
375 {
376   /* minimum size was already checked, everything else is OK! */
377   return GNUNET_OK;
378 }
379
380
381 /**
382  * Handle result message for a set operation.
383  *
384  * @param cls the set
385  * @param mh the message
386  */
387 static void
388 handle_result (void *cls,
389                const struct GNUNET_SET_ResultMessage *msg)
390 {
391   struct GNUNET_SET_Handle *set = cls;
392   struct GNUNET_SET_OperationHandle *oh;
393   struct GNUNET_SET_Element e;
394   enum GNUNET_SET_Status result_status;
395
396   GNUNET_assert (NULL != set->mq);
397   result_status = ntohs (msg->result_status);
398   LOG (GNUNET_ERROR_TYPE_DEBUG,
399        "Got result message with status %d\n",
400        result_status);
401
402   oh = GNUNET_MQ_assoc_get (set->mq,
403                             ntohl (msg->request_id));
404   if (NULL == oh)
405   {
406     /* 'oh' can be NULL if we canceled the operation, but the service
407        did not get the cancel message yet. */
408     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
409                 "Ignoring result from canceled operation\n");
410     return;
411   }
412
413   switch (result_status)
414   {
415     case GNUNET_SET_STATUS_OK:
416     case GNUNET_SET_STATUS_ADD_LOCAL:
417     case GNUNET_SET_STATUS_ADD_REMOTE:
418       goto do_element;
419     case GNUNET_SET_STATUS_FAILURE:
420     case GNUNET_SET_STATUS_DONE:
421       goto do_final;
422     case GNUNET_SET_STATUS_HALF_DONE:
423       /* not used anymore */
424       GNUNET_assert (0);
425   }
426
427 do_final:
428   LOG (GNUNET_ERROR_TYPE_DEBUG,
429        "Treating result as final status\n");
430   GNUNET_MQ_assoc_remove (set->mq,
431                           ntohl (msg->request_id));
432   GNUNET_CONTAINER_DLL_remove (set->ops_head,
433                                set->ops_tail,
434                                oh);
435   if (NULL != oh->result_cb)
436   {
437     oh->result_cb (oh->result_cls,
438                    NULL,
439                    result_status);
440   }
441   else
442   {
443     LOG (GNUNET_ERROR_TYPE_DEBUG,
444          "No callback for final status\n");
445   }
446   if ( (GNUNET_YES == set->destroy_requested) &&
447        (NULL == set->ops_head) )
448     GNUNET_SET_destroy (set);
449   GNUNET_free (oh);
450   return;
451
452 do_element:
453   LOG (GNUNET_ERROR_TYPE_DEBUG,
454        "Treating result as element\n");
455   e.data = &msg[1];
456   e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
457   e.element_type = ntohs (msg->element_type);
458   if (NULL != oh->result_cb)
459     oh->result_cb (oh->result_cls,
460                    &e,
461                    result_status);
462 }
463
464
465 /**
466  * Destroy the given set operation.
467  *
468  * @param oh set operation to destroy
469  */
470 static void
471 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
472 {
473   struct GNUNET_SET_Handle *set = oh->set;
474   struct GNUNET_SET_OperationHandle *h_assoc;
475
476   if (NULL != oh->conclude_mqm)
477     GNUNET_MQ_discard (oh->conclude_mqm);
478   /* is the operation already commited? */
479   if (NULL != set)
480   {
481     GNUNET_CONTAINER_DLL_remove (set->ops_head,
482                                  set->ops_tail,
483                                  oh);
484     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
485                                       oh->request_id);
486     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
487   }
488   GNUNET_free (oh);
489 }
490
491
492 /**
493  * Cancel the given set operation.  We need to send an explicit cancel
494  * message, as all operations one one set communicate using one
495  * handle.
496  *
497  * @param oh set operation to cancel
498  */
499 void
500 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
501 {
502   struct GNUNET_SET_Handle *set = oh->set;
503   struct GNUNET_SET_CancelMessage *m;
504   struct GNUNET_MQ_Envelope *mqm;
505
506   if (NULL != set)
507   {
508     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
509     m->request_id = htonl (oh->request_id);
510     GNUNET_MQ_send (set->mq, mqm);
511   }
512   set_operation_destroy (oh);
513   if ( (NULL != set) &&
514        (GNUNET_YES == set->destroy_requested) &&
515        (NULL == set->ops_head) )
516   {
517     LOG (GNUNET_ERROR_TYPE_DEBUG,
518          "Destroying set after operation cancel\n");
519     GNUNET_SET_destroy (set);
520   }
521 }
522
523
524 /**
525  * We encountered an error communicating with the set service while
526  * performing a set operation. Report to the application.
527  *
528  * @param cls the `struct GNUNET_SET_Handle`
529  * @param error error code
530  */
531 static void
532 handle_client_set_error (void *cls,
533                          enum GNUNET_MQ_Error error)
534 {
535   struct GNUNET_SET_Handle *set = cls;
536   GNUNET_SET_ElementIterator iter = set->iterator;
537   
538   LOG (GNUNET_ERROR_TYPE_DEBUG,
539        "Handling client set error %d\n",
540        error);
541   while (NULL != set->ops_head)
542   {
543     if (NULL != set->ops_head->result_cb)
544       set->ops_head->result_cb (set->ops_head->result_cls,
545                                 NULL,
546                                 GNUNET_SET_STATUS_FAILURE);
547     set_operation_destroy (set->ops_head);
548   }
549   set->iterator = NULL;
550   set->iteration_id++;
551   if (NULL != iter)
552     iter (set->iterator_cls,
553           NULL);
554   set->invalid = GNUNET_YES;
555   if (GNUNET_YES == set->destroy_requested)
556   {
557     LOG (GNUNET_ERROR_TYPE_DEBUG,
558          "Destroying set after operation failure\n");
559     GNUNET_SET_destroy (set);
560   }
561 }
562
563
564 static struct GNUNET_SET_Handle *
565 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
566                  enum GNUNET_SET_OperationType op,
567                  const uint32_t *cookie)
568 {
569   GNUNET_MQ_hd_var_size (result,
570                          GNUNET_MESSAGE_TYPE_SET_RESULT,
571                          struct GNUNET_SET_ResultMessage);
572   GNUNET_MQ_hd_var_size (iter_element,
573                          GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
574                          struct GNUNET_SET_IterResponseMessage);
575   GNUNET_MQ_hd_fixed_size (iter_done,
576                            GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
577                            struct GNUNET_MessageHeader);
578   GNUNET_MQ_hd_fixed_size (copy_lazy,
579                            GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
580                            struct GNUNET_SET_CopyLazyResponseMessage);
581   struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
582   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
583     make_result_handler (set),
584     make_iter_element_handler (set),
585     make_iter_done_handler (set),
586     make_copy_lazy_handler (set),
587     GNUNET_MQ_handler_end ()
588   };
589   struct GNUNET_MQ_Envelope *mqm;
590   struct GNUNET_SET_CreateMessage *create_msg;
591   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
592
593   set->cfg = cfg;
594   set->client = GNUNET_CLIENT_connect ("set", cfg);
595   if (NULL == set->client)
596   {
597     GNUNET_free (set);
598     return NULL;
599   }
600   set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
601                                                    mq_handlers,
602                                                    &handle_client_set_error,
603                                                    set);
604   GNUNET_assert (NULL != set->mq);
605
606   if (NULL == cookie)
607   {
608     LOG (GNUNET_ERROR_TYPE_DEBUG,
609          "Creating new set (operation %u)\n",
610          op);
611     mqm = GNUNET_MQ_msg (create_msg,
612                          GNUNET_MESSAGE_TYPE_SET_CREATE);
613     create_msg->operation = htonl (op);
614   }
615   else
616   {
617     LOG (GNUNET_ERROR_TYPE_DEBUG,
618          "Creating new set (lazy copy)\n",
619          op);
620     mqm = GNUNET_MQ_msg (copy_msg,
621                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
622     copy_msg->cookie = *cookie;
623   }
624   GNUNET_MQ_send (set->mq, mqm);
625   return set;
626 }
627
628
629 /**
630  * Create an empty set, supporting the specified operation.
631  *
632  * @param cfg configuration to use for connecting to the
633  *        set service
634  * @param op operation supported by the set
635  *        Note that the operation has to be specified
636  *        beforehand, as certain set operations need to maintain
637  *        data structures spefific to the operation
638  * @return a handle to the set
639  */
640 struct GNUNET_SET_Handle *
641 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
642                    enum GNUNET_SET_OperationType op)
643 {
644   return create_internal (cfg, op, NULL);
645 }
646
647
648 /**
649  * Add an element to the given set.  After the element has been added
650  * (in the sense of being transmitted to the set service), @a cont
651  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
652  * queued.
653  *
654  * @param set set to add element to
655  * @param element element to add to the set
656  * @param cont continuation called after the element has been added
657  * @param cont_cls closure for @a cont
658  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
659  *         set is invalid (e.g. the set service crashed)
660  */
661 int
662 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
663                         const struct GNUNET_SET_Element *element,
664                         GNUNET_SET_Continuation cont,
665                         void *cont_cls)
666 {
667   struct GNUNET_MQ_Envelope *mqm;
668   struct GNUNET_SET_ElementMessage *msg;
669
670   if (GNUNET_YES == set->invalid)
671   {
672     if (NULL != cont)
673       cont (cont_cls);
674     return GNUNET_SYSERR;
675   }
676   mqm = GNUNET_MQ_msg_extra (msg, element->size,
677                              GNUNET_MESSAGE_TYPE_SET_ADD);
678   msg->element_type = htons (element->element_type);
679   memcpy (&msg[1],
680           element->data,
681           element->size);
682   GNUNET_MQ_notify_sent (mqm,
683                          cont, cont_cls);
684   GNUNET_MQ_send (set->mq, mqm);
685   return GNUNET_OK;
686 }
687
688
689 /**
690  * Remove an element to the given set.  After the element has been
691  * removed (in the sense of the request being transmitted to the set
692  * service), @a cont will be called.  Multiple calls to
693  * GNUNET_SET_remove_element() can be queued
694  *
695  * @param set set to remove element from
696  * @param element element to remove from the set
697  * @param cont continuation called after the element has been removed
698  * @param cont_cls closure for @a cont
699  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
700  *         set is invalid (e.g. the set service crashed)
701  */
702 int
703 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
704                            const struct GNUNET_SET_Element *element,
705                            GNUNET_SET_Continuation cont,
706                            void *cont_cls)
707 {
708   struct GNUNET_MQ_Envelope *mqm;
709   struct GNUNET_SET_ElementMessage *msg;
710
711   if (GNUNET_YES == set->invalid)
712   {
713     if (NULL != cont)
714       cont (cont_cls);
715     return GNUNET_SYSERR;
716   }
717   mqm = GNUNET_MQ_msg_extra (msg,
718                              element->size,
719                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
720   msg->element_type = htons (element->element_type);
721   memcpy (&msg[1],
722           element->data,
723           element->size);
724   GNUNET_MQ_notify_sent (mqm,
725                          cont, cont_cls);
726   GNUNET_MQ_send (set->mq, mqm);
727   return GNUNET_OK;
728 }
729
730
731 /**
732  * Destroy the set handle if no operations are left, mark the set
733  * for destruction otherwise.
734  *
735  * @param set set handle to destroy
736  */
737 void
738 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
739 {
740   /* destroying set while iterator is active is currently
741      not supported; we should expand the API to allow
742      clients to explicitly cancel the iteration! */
743   GNUNET_assert (NULL == set->iterator);
744   if (NULL != set->ops_head)
745   {
746     LOG (GNUNET_ERROR_TYPE_DEBUG,
747          "Set operations are pending, delaying set destruction\n");
748     set->destroy_requested = GNUNET_YES;
749     return;
750   }
751   LOG (GNUNET_ERROR_TYPE_DEBUG,
752        "Really destroying set\n");
753   if (NULL != set->client)
754   {
755     GNUNET_CLIENT_disconnect (set->client);
756     set->client = NULL;
757   }
758   if (NULL != set->mq)
759   {
760     GNUNET_MQ_destroy (set->mq);
761     set->mq = NULL;
762   }
763   GNUNET_free (set);
764 }
765
766
767 /**
768  * Prepare a set operation to be evaluated with another peer.
769  * The evaluation will not start until the client provides
770  * a local set with #GNUNET_SET_commit().
771  *
772  * @param other_peer peer with the other set
773  * @param app_id hash for the application using the set
774  * @param context_msg additional information for the request
775  * @param result_mode specified how results will be returned,
776  *        see `enum GNUNET_SET_ResultMode`.
777  * @param result_cb called on error or success
778  * @param result_cls closure for @e result_cb
779  * @return a handle to cancel the operation
780  */
781 struct GNUNET_SET_OperationHandle *
782 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
783                     const struct GNUNET_HashCode *app_id,
784                     const struct GNUNET_MessageHeader *context_msg,
785                     enum GNUNET_SET_ResultMode result_mode,
786                     GNUNET_SET_ResultIterator result_cb,
787                     void *result_cls)
788 {
789   struct GNUNET_MQ_Envelope *mqm;
790   struct GNUNET_SET_OperationHandle *oh;
791   struct GNUNET_SET_EvaluateMessage *msg;
792
793   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
794   oh->result_cb = result_cb;
795   oh->result_cls = result_cls;
796   mqm = GNUNET_MQ_msg_nested_mh (msg,
797                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
798                                  context_msg);
799   msg->app_id = *app_id;
800   msg->result_mode = htonl (result_mode);
801   msg->target_peer = *other_peer;
802   oh->conclude_mqm = mqm;
803   oh->request_id_addr = &msg->request_id;
804
805   return oh;
806 }
807
808
809 /**
810  * Connect to the set service in order to listen for requests.
811  *
812  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
813  */
814 static void
815 listen_connect (void *cls);
816
817
818 /**
819  * Check validity of request message for a listen operation
820  *
821  * @param cls the listen handle
822  * @param msg the message
823  * @return #GNUNET_OK if the message is well-formed
824  */
825 static int
826 check_request (void *cls,
827                const struct GNUNET_SET_RequestMessage *msg)
828 {
829   const struct GNUNET_MessageHeader *context_msg;
830
831   context_msg = GNUNET_MQ_extract_nested_mh (msg);
832   if (NULL == context_msg)
833   {
834     GNUNET_break_op (0);
835     return GNUNET_SYSERR;
836   }
837   return GNUNET_OK;
838 }
839
840
841 /**
842  * Handle request message for a listen operation
843  *
844  * @param cls the listen handle
845  * @param msg the message
846  */
847 static void
848 handle_request (void *cls,
849                 const struct GNUNET_SET_RequestMessage *msg)
850 {
851   struct GNUNET_SET_ListenHandle *lh = cls;
852   struct GNUNET_SET_Request req;
853   const struct GNUNET_MessageHeader *context_msg;
854   struct GNUNET_MQ_Envelope *mqm;
855   struct GNUNET_SET_RejectMessage *rmsg;
856
857   LOG (GNUNET_ERROR_TYPE_DEBUG,
858        "Processing incoming operation request\n");
859   /* we got another valid request => reset the backoff */
860   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
861   req.accept_id = ntohl (msg->accept_id);
862   req.accepted = GNUNET_NO;
863   context_msg = GNUNET_MQ_extract_nested_mh (msg);
864   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
865   lh->listen_cb (lh->listen_cls,
866                  &msg->peer_id,
867                  context_msg,
868                  &req);
869   if (GNUNET_YES == req.accepted)
870     return; /* the accept-case is handled in #GNUNET_SET_accept() */
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872        "Rejecting request\n");
873   mqm = GNUNET_MQ_msg (rmsg,
874                        GNUNET_MESSAGE_TYPE_SET_REJECT);
875   rmsg->accept_reject_id = msg->accept_id;
876   GNUNET_MQ_send (lh->mq, mqm);
877 }
878
879
880 /**
881  * Our connection with the set service encountered an error,
882  * re-initialize with exponential back-off.
883  *
884  * @param cls the `struct GNUNET_SET_ListenHandle *`
885  * @param error reason for the disconnect
886  */
887 static void
888 handle_client_listener_error (void *cls,
889                               enum GNUNET_MQ_Error error)
890 {
891   struct GNUNET_SET_ListenHandle *lh = cls;
892
893   LOG (GNUNET_ERROR_TYPE_DEBUG,
894        "Listener broke down (%d), re-connecting\n",
895        (int) error);
896   GNUNET_CLIENT_disconnect (lh->client);
897   lh->client = NULL;
898   GNUNET_MQ_destroy (lh->mq);
899   lh->mq = NULL;
900   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
901                                                      &listen_connect,
902                                                      lh);
903   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
904 }
905
906
907 /**
908  * Connect to the set service in order to listen for requests.
909  *
910  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
911  */
912 static void
913 listen_connect (void *cls)
914
915   GNUNET_MQ_hd_var_size (request,
916                          GNUNET_MESSAGE_TYPE_SET_REQUEST,
917                          struct GNUNET_SET_RequestMessage);
918   struct GNUNET_SET_ListenHandle *lh = cls;
919   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
920     make_request_handler (lh),
921     GNUNET_MQ_handler_end ()
922   };
923   struct GNUNET_MQ_Envelope *mqm;
924   struct GNUNET_SET_ListenMessage *msg;
925
926   lh->reconnect_task = NULL;
927   GNUNET_assert (NULL == lh->client);
928   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
929   if (NULL == lh->client)
930     return;
931   GNUNET_assert (NULL == lh->mq);
932   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
933                                                   mq_handlers,
934                                                   &handle_client_listener_error,
935                                                   lh);
936   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
937   msg->operation = htonl (lh->operation);
938   msg->app_id = lh->app_id;
939   GNUNET_MQ_send (lh->mq, mqm);
940 }
941
942
943 /**
944  * Wait for set operation requests for the given application id
945  *
946  * @param cfg configuration to use for connecting to
947  *            the set service, needs to be valid for the lifetime of the listen handle
948  * @param operation operation we want to listen for
949  * @param app_id id of the application that handles set operation requests
950  * @param listen_cb called for each incoming request matching the operation
951  *                  and application id
952  * @param listen_cls handle for @a listen_cb
953  * @return a handle that can be used to cancel the listen operation
954  */
955 struct GNUNET_SET_ListenHandle *
956 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
957                    enum GNUNET_SET_OperationType operation,
958                    const struct GNUNET_HashCode *app_id,
959                    GNUNET_SET_ListenCallback listen_cb,
960                    void *listen_cls)
961 {
962   struct GNUNET_SET_ListenHandle *lh;
963
964   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
965   lh->listen_cb = listen_cb;
966   lh->listen_cls = listen_cls;
967   lh->cfg = cfg;
968   lh->operation = operation;
969   lh->app_id = *app_id;
970   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
971   listen_connect (lh);
972   if (NULL == lh->client)
973   {
974     GNUNET_free (lh);
975     return NULL;
976   }
977   return lh;
978 }
979
980
981 /**
982  * Cancel the given listen operation.
983  *
984  * @param lh handle for the listen operation
985  */
986 void
987 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
988 {
989   LOG (GNUNET_ERROR_TYPE_DEBUG,
990        "Canceling listener\n");
991   if (NULL != lh->mq)
992   {
993     GNUNET_MQ_destroy (lh->mq);
994     lh->mq = NULL;
995   }
996   if (NULL != lh->client)
997   {
998     GNUNET_CLIENT_disconnect (lh->client);
999     lh->client = NULL;
1000   }
1001   if (NULL != lh->reconnect_task)
1002   {
1003     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
1004     lh->reconnect_task = NULL;
1005   }
1006   GNUNET_free (lh);
1007 }
1008
1009
1010 /**
1011  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
1012  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
1013  * afterwards.
1014  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
1015  * and to begin the exchange with the remote peer.
1016  *
1017  * @param request request to accept
1018  * @param result_mode specified how results will be returned,
1019  *        see `enum GNUNET_SET_ResultMode`.
1020  * @param result_cb callback for the results
1021  * @param result_cls closure for @a result_cb
1022  * @return a handle to cancel the operation
1023  */
1024 struct GNUNET_SET_OperationHandle *
1025 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1026                    enum GNUNET_SET_ResultMode result_mode,
1027                    GNUNET_SET_ResultIterator result_cb,
1028                    void *result_cls)
1029 {
1030   struct GNUNET_MQ_Envelope *mqm;
1031   struct GNUNET_SET_OperationHandle *oh;
1032   struct GNUNET_SET_AcceptMessage *msg;
1033
1034   GNUNET_assert (GNUNET_NO == request->accepted);
1035   request->accepted = GNUNET_YES;
1036   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1037   msg->accept_reject_id = htonl (request->accept_id);
1038   msg->result_mode = htonl (result_mode);
1039   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1040   oh->result_cb = result_cb;
1041   oh->result_cls = result_cls;
1042   oh->conclude_mqm = mqm;
1043   oh->request_id_addr = &msg->request_id;
1044   return oh;
1045 }
1046
1047
1048 /**
1049  * Commit a set to be used with a set operation.
1050  * This function is called once we have fully constructed
1051  * the set that we want to use for the operation.  At this
1052  * time, the P2P protocol can then begin to exchange the
1053  * set information and call the result callback with the
1054  * result information.
1055  *
1056  * @param oh handle to the set operation
1057  * @param set the set to use for the operation
1058  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1059  *         set is invalid (e.g. the set service crashed)
1060  */
1061 int
1062 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1063                    struct GNUNET_SET_Handle *set)
1064 {
1065   if (NULL != oh->set)
1066   {
1067     /* Some other set was already commited for this
1068      * operation, there is a logic bug in the client of this API */
1069     GNUNET_break (0);
1070     return GNUNET_OK;
1071   }
1072   if (GNUNET_YES == set->invalid)
1073     return GNUNET_SYSERR;
1074   GNUNET_assert (NULL != oh->conclude_mqm);
1075   oh->set = set;
1076   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1077                                set->ops_tail,
1078                                oh);
1079   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
1080   *oh->request_id_addr = htonl (oh->request_id);
1081   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
1082   oh->conclude_mqm = NULL;
1083   oh->request_id_addr = NULL;
1084   return GNUNET_OK;
1085 }
1086
1087
1088 /**
1089  * Iterate over all elements in the given set.  Note that this
1090  * operation involves transferring every element of the set from the
1091  * service to the client, and is thus costly.
1092  *
1093  * @param set the set to iterate over
1094  * @param iter the iterator to call for each element
1095  * @param iter_cls closure for @a iter
1096  * @return #GNUNET_YES if the iteration started successfuly,
1097  *         #GNUNET_NO if another iteration is active
1098  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1099  */
1100 int
1101 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1102                     GNUNET_SET_ElementIterator iter,
1103                     void *iter_cls)
1104 {
1105   struct GNUNET_MQ_Envelope *ev;
1106
1107   GNUNET_assert (NULL != iter);
1108   if (GNUNET_YES == set->invalid)
1109     return GNUNET_SYSERR;
1110   if (NULL != set->iterator)
1111     return GNUNET_NO;
1112   LOG (GNUNET_ERROR_TYPE_DEBUG,
1113        "Iterating over set\n");
1114   set->iterator = iter;
1115   set->iterator_cls = iter_cls;
1116   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1117   GNUNET_MQ_send (set->mq, ev);
1118   return GNUNET_YES;
1119 }
1120
1121
1122 void
1123 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1124                       GNUNET_SET_CopyReadyCallback cb,
1125                       void *cls)
1126 {
1127   struct GNUNET_MQ_Envelope *ev;
1128   struct SetCopyRequest *req;
1129
1130   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1131   GNUNET_MQ_send (set->mq, ev);
1132
1133   req = GNUNET_new (struct SetCopyRequest);
1134   req->cb = cb;
1135   req->cls = cls;
1136   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1137                                set->copy_req_tail,
1138                                req);
1139 }
1140
1141
1142 /**
1143  * Create a copy of an element.  The copy
1144  * must be GNUNET_free-d by the caller.
1145  *
1146  * @param element the element to copy
1147  * @return the copied element
1148  */
1149 struct GNUNET_SET_Element *
1150 GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
1151 {
1152   struct GNUNET_SET_Element *copy;
1153
1154   copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
1155   copy->size = element->size;
1156   copy->element_type = element->element_type;
1157   copy->data = &copy[1];
1158   memcpy ((void *) copy->data, element->data, copy->size);
1159
1160   return copy;
1161 }
1162
1163
1164 /**
1165  * Hash a set element.
1166  *
1167  * @param element the element that should be hashed
1168  * @param[out] ret_hash a pointer to where the hash of @a element
1169  *        should be stored
1170  */
1171 void
1172 GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
1173                          struct GNUNET_HashCode *ret_hash)
1174 {
1175   struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
1176
1177   /* It's not guaranteed that the element data is always after the element header,
1178      so we need to hash the chunks separately. */
1179   GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
1180   GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
1181   GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
1182   GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
1183 }
1184
1185 /* end of set_api.c */