small API change: do no longer pass rarely needed GNUNET_SCHEDULER_TaskContext to...
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_set_service.h"
31 #include "set.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35
36 struct SetCopyRequest
37 {
38   struct SetCopyRequest *next;
39
40   struct SetCopyRequest *prev;
41
42   void *cls;
43
44   GNUNET_SET_CopyReadyCallback cb;
45 };
46
47 /**
48  * Opaque handle to a set.
49  */
50 struct GNUNET_SET_Handle
51 {
52   /**
53    * Client connected to the set service.
54    */
55   struct GNUNET_CLIENT_Connection *client;
56
57   /**
58    * Message queue for @e client.
59    */
60   struct GNUNET_MQ_Handle *mq;
61
62   /**
63    * Linked list of operations on the set.
64    */
65   struct GNUNET_SET_OperationHandle *ops_head;
66
67   /**
68    * Linked list of operations on the set.
69    */
70   struct GNUNET_SET_OperationHandle *ops_tail;
71
72   /**
73    * Callback for the current iteration over the set,
74    * NULL if no iterator is active.
75    */
76   GNUNET_SET_ElementIterator iterator;
77
78   /**
79    * Closure for @e iterator
80    */
81   void *iterator_cls;
82
83   /**
84    * Should the set be destroyed once all operations are gone?
85    */
86   int destroy_requested;
87
88   /**
89    * Has the set become invalid (e.g. service died)?
90    */
91   int invalid;
92
93   /**
94    * Both client and service count the number of iterators
95    * created so far to match replies with iterators.
96    */
97   uint16_t iteration_id;
98
99   /**
100    * Configuration, needed when creating (lazy) copies.
101    */
102   const struct GNUNET_CONFIGURATION_Handle *cfg;
103
104   /**
105    * Doubly linked list of copy requests.
106    */
107   struct SetCopyRequest *copy_req_head;
108
109   /**
110    * Doubly linked list of copy requests.
111    */
112   struct SetCopyRequest *copy_req_tail;
113 };
114
115
116 /**
117  * Handle for a set operation request from another peer.
118  */
119 struct GNUNET_SET_Request
120 {
121   /**
122    * Id of the request, used to identify the request when
123    * accepting/rejecting it.
124    */
125   uint32_t accept_id;
126
127   /**
128    * Has the request been accepted already?
129    * #GNUNET_YES/#GNUNET_NO
130    */
131   int accepted;
132 };
133
134
135 /**
136  * Handle to an operation.  Only known to the service after committing
137  * the handle with a set.
138  */
139 struct GNUNET_SET_OperationHandle
140 {
141   /**
142    * Function to be called when we have a result,
143    * or an error.
144    */
145   GNUNET_SET_ResultIterator result_cb;
146
147   /**
148    * Closure for @e result_cb.
149    */
150   void *result_cls;
151
152   /**
153    * Local set used for the operation,
154    * NULL if no set has been provided by conclude yet.
155    */
156   struct GNUNET_SET_Handle *set;
157
158   /**
159    * Message sent to the server on calling conclude,
160    * NULL if conclude has been called.
161    */
162   struct GNUNET_MQ_Envelope *conclude_mqm;
163
164   /**
165    * Address of the request if in the conclude message,
166    * used to patch the request id into the message when the set is known.
167    */
168   uint32_t *request_id_addr;
169
170   /**
171    * Handles are kept in a linked list.
172    */
173   struct GNUNET_SET_OperationHandle *prev;
174
175   /**
176    * Handles are kept in a linked list.
177    */
178   struct GNUNET_SET_OperationHandle *next;
179
180   /**
181    * Request ID to identify the operation within the set.
182    */
183   uint32_t request_id;
184 };
185
186
187 /**
188  * Opaque handle to a listen operation.
189  */
190 struct GNUNET_SET_ListenHandle
191 {
192   /**
193    * Connection to the service.
194    */
195   struct GNUNET_CLIENT_Connection *client;
196
197   /**
198    * Message queue for the client.
199    */
200   struct GNUNET_MQ_Handle* mq;
201
202   /**
203    * Configuration handle for the listener, stored
204    * here to be able to reconnect transparently on
205    * connection failure.
206    */
207   const struct GNUNET_CONFIGURATION_Handle *cfg;
208
209   /**
210    * Function to call on a new incoming request,
211    * or on error.
212    */
213   GNUNET_SET_ListenCallback listen_cb;
214
215   /**
216    * Closure for @e listen_cb.
217    */
218   void *listen_cls;
219
220   /**
221    * Application ID we listen for.
222    */
223   struct GNUNET_HashCode app_id;
224
225   /**
226    * Time to wait until we try to reconnect on failure.
227    */
228   struct GNUNET_TIME_Relative reconnect_backoff;
229
230   /**
231    * Task for reconnecting when the listener fails.
232    */
233   struct GNUNET_SCHEDULER_Task * reconnect_task;
234
235   /**
236    * Operation we listen for.
237    */
238   enum GNUNET_SET_OperationType operation;
239 };
240
241
242 /* mutual recursion with handle_copy_lazy */
243 static struct GNUNET_SET_Handle *
244 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
245                  enum GNUNET_SET_OperationType op,
246                  uint32_t *cookie);
247
248
249 /**
250  * Handle element for iteration over the set.  Notifies the
251  * iterator and sends an acknowledgement to the service.
252  *
253  * @param cls the `struct GNUNET_SET_Handle *`
254  * @param mh the message
255  */
256 static void
257 handle_copy_lazy (void *cls,
258                   const struct GNUNET_MessageHeader *mh)
259 {
260   struct GNUNET_SET_CopyLazyResponseMessage *msg;
261   struct GNUNET_SET_Handle *set = cls;
262   struct SetCopyRequest *req;
263   struct GNUNET_SET_Handle *new_set;
264
265   msg = (struct GNUNET_SET_CopyLazyResponseMessage *) mh;
266
267   req = set->copy_req_head;
268
269   if (NULL == req)
270   {
271     /* Service sent us unsolicited lazy copy response */
272     GNUNET_break (0);
273     return;
274   }
275
276   LOG (GNUNET_ERROR_TYPE_DEBUG,
277        "Handling response to lazy copy\n");
278
279   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
280                                set->copy_req_tail,
281                                req);
282
283
284   // We pass none as operation here, since it doesn't matter when
285   // cloning.
286   new_set = create_internal (set->cfg, GNUNET_SET_OPERATION_NONE, &msg->cookie);
287
288   req->cb (req->cls, new_set);
289
290   GNUNET_free (req);
291 }
292
293
294 /**
295  * Handle element for iteration over the set.  Notifies the
296  * iterator and sends an acknowledgement to the service.
297  *
298  * @param cls the `struct GNUNET_SET_Handle *`
299  * @param mh the message
300  */
301 static void
302 handle_iter_element (void *cls,
303                      const struct GNUNET_MessageHeader *mh)
304 {
305   struct GNUNET_SET_Handle *set = cls;
306   GNUNET_SET_ElementIterator iter = set->iterator;
307   struct GNUNET_SET_Element element;
308   const struct GNUNET_SET_IterResponseMessage *msg;
309   struct GNUNET_SET_IterAckMessage *ack_msg;
310   struct GNUNET_MQ_Envelope *ev;
311   uint16_t msize;
312
313   msize = ntohs (mh->size);
314   if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
315   {
316     /* message malformed */
317     GNUNET_break (0);
318     set->iterator = NULL;
319     set->iteration_id++;
320     iter (set->iterator_cls,
321           NULL);
322     iter = NULL;
323   }
324   msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
325   if (set->iteration_id != ntohs (msg->iteration_id))
326   {
327     /* element from a previous iteration, skip! */
328     iter = NULL;
329   }
330   if (NULL != iter)
331   {
332     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
333     element.element_type = ntohs (msg->element_type);
334     element.data = &msg[1];
335     iter (set->iterator_cls,
336           &element);
337   }
338   ev = GNUNET_MQ_msg (ack_msg,
339                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
340   ack_msg->send_more = htonl ((NULL != iter));
341   GNUNET_MQ_send (set->mq, ev);
342 }
343
344
345 /**
346  * Handle message signalling conclusion of iteration over the set.
347  * Notifies the iterator that we are done.
348  *
349  * @param cls the set
350  * @param mh the message
351  */
352 static void
353 handle_iter_done (void *cls,
354                   const struct GNUNET_MessageHeader *mh)
355 {
356   struct GNUNET_SET_Handle *set = cls;
357   GNUNET_SET_ElementIterator iter = set->iterator;
358
359   if (NULL == iter)
360     return;
361   set->iterator = NULL;
362   set->iteration_id++;
363   iter (set->iterator_cls,
364         NULL);
365 }
366
367
368 /**
369  * Handle result message for a set operation.
370  *
371  * @param cls the set
372  * @param mh the message
373  */
374 static void
375 handle_result (void *cls,
376                const struct GNUNET_MessageHeader *mh)
377 {
378   struct GNUNET_SET_Handle *set = cls;
379   const struct GNUNET_SET_ResultMessage *msg;
380   struct GNUNET_SET_OperationHandle *oh;
381   struct GNUNET_SET_Element e;
382   enum GNUNET_SET_Status result_status;
383
384   msg = (const struct GNUNET_SET_ResultMessage *) mh;
385   GNUNET_assert (NULL != set->mq);
386   result_status = ntohs (msg->result_status);
387   LOG (GNUNET_ERROR_TYPE_DEBUG,
388        "Got result message with status %d\n",
389        result_status);
390
391   oh = GNUNET_MQ_assoc_get (set->mq,
392                             ntohl (msg->request_id));
393   if (NULL == oh)
394   {
395     /* 'oh' can be NULL if we canceled the operation, but the service
396        did not get the cancel message yet. */
397     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
398                 "Ignoring result from canceled operation\n");
399     return;
400   }
401
402   switch (result_status)
403   {
404     case GNUNET_SET_STATUS_OK:
405     case GNUNET_SET_STATUS_ADD_LOCAL:
406     case GNUNET_SET_STATUS_ADD_REMOTE:
407       goto do_element;
408     case GNUNET_SET_STATUS_FAILURE:
409     case GNUNET_SET_STATUS_DONE:
410       goto do_final;
411     case GNUNET_SET_STATUS_HALF_DONE:
412       /* not used anymore */
413       GNUNET_assert (0);
414   }
415
416 do_final:
417   LOG (GNUNET_ERROR_TYPE_DEBUG,
418        "Treating result as final status\n");
419   GNUNET_MQ_assoc_remove (set->mq,
420                           ntohl (msg->request_id));
421   GNUNET_CONTAINER_DLL_remove (set->ops_head,
422                                set->ops_tail,
423                                oh);
424   if (NULL != oh->result_cb)
425   {
426     oh->result_cb (oh->result_cls,
427                    NULL,
428                    result_status);
429   }
430   else
431   {
432     LOG (GNUNET_ERROR_TYPE_DEBUG,
433          "No callback for final status\n");
434   }
435   if ( (GNUNET_YES == set->destroy_requested) &&
436        (NULL == set->ops_head) )
437     GNUNET_SET_destroy (set);
438   GNUNET_free (oh);
439   return;
440
441 do_element:
442   LOG (GNUNET_ERROR_TYPE_DEBUG,
443        "Treating result as element\n");
444   e.data = &msg[1];
445   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
446   e.element_type = ntohs (msg->element_type);
447   if (NULL != oh->result_cb)
448     oh->result_cb (oh->result_cls,
449                    &e,
450                    result_status);
451 }
452
453
454 /**
455  * Destroy the given set operation.
456  *
457  * @param oh set operation to destroy
458  */
459 static void
460 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
461 {
462   struct GNUNET_SET_Handle *set = oh->set;
463   struct GNUNET_SET_OperationHandle *h_assoc;
464
465   if (NULL != oh->conclude_mqm)
466     GNUNET_MQ_discard (oh->conclude_mqm);
467   /* is the operation already commited? */
468   if (NULL != set)
469   {
470     GNUNET_CONTAINER_DLL_remove (set->ops_head,
471                                  set->ops_tail,
472                                  oh);
473     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
474                                       oh->request_id);
475     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
476   }
477   GNUNET_free (oh);
478 }
479
480
481 /**
482  * Cancel the given set operation.  We need to send an explicit cancel
483  * message, as all operations one one set communicate using one
484  * handle.
485  *
486  * @param oh set operation to cancel
487  */
488 void
489 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
490 {
491   struct GNUNET_SET_Handle *set = oh->set;
492   struct GNUNET_SET_CancelMessage *m;
493   struct GNUNET_MQ_Envelope *mqm;
494
495   if (NULL != set)
496   {
497     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
498     m->request_id = htonl (oh->request_id);
499     GNUNET_MQ_send (set->mq, mqm);
500   }
501   set_operation_destroy (oh);
502   if ( (NULL != set) &&
503        (GNUNET_YES == set->destroy_requested) &&
504        (NULL == set->ops_head) )
505   {
506     LOG (GNUNET_ERROR_TYPE_DEBUG,
507          "Destroying set after operation cancel\n");
508     GNUNET_SET_destroy (set);
509   }
510 }
511
512
513 /**
514  * We encountered an error communicating with the set service while
515  * performing a set operation. Report to the application.
516  *
517  * @param cls the `struct GNUNET_SET_Handle`
518  * @param error error code
519  */
520 static void
521 handle_client_set_error (void *cls,
522                          enum GNUNET_MQ_Error error)
523 {
524   struct GNUNET_SET_Handle *set = cls;
525
526   LOG (GNUNET_ERROR_TYPE_DEBUG,
527        "Handling client set error %d\n",
528        error);
529   while (NULL != set->ops_head)
530   {
531     if (NULL != set->ops_head->result_cb)
532       set->ops_head->result_cb (set->ops_head->result_cls,
533                                 NULL,
534                                 GNUNET_SET_STATUS_FAILURE);
535     set_operation_destroy (set->ops_head);
536   }
537   set->invalid = GNUNET_YES;
538   if (GNUNET_YES == set->destroy_requested)
539   {
540     LOG (GNUNET_ERROR_TYPE_DEBUG,
541          "Destroying set after operation failure\n");
542     GNUNET_SET_destroy (set);
543   }
544 }
545
546
547 static struct GNUNET_SET_Handle *
548 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
549                  enum GNUNET_SET_OperationType op,
550                  uint32_t *cookie)
551 {
552   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
553     { &handle_result,
554       GNUNET_MESSAGE_TYPE_SET_RESULT,
555       0 },
556     { &handle_iter_element,
557       GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
558       0 },
559     { &handle_iter_done,
560       GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
561       sizeof (struct GNUNET_MessageHeader) },
562     { &handle_copy_lazy,
563       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
564       sizeof (struct GNUNET_SET_CopyLazyResponseMessage) },
565     GNUNET_MQ_HANDLERS_END
566   };
567   struct GNUNET_SET_Handle *set;
568   struct GNUNET_MQ_Envelope *mqm;
569   struct GNUNET_SET_CreateMessage *create_msg;
570   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
571
572   set = GNUNET_new (struct GNUNET_SET_Handle);
573   set->client = GNUNET_CLIENT_connect ("set", cfg);
574   set->cfg = cfg;
575   if (NULL == set->client)
576   {
577     GNUNET_free (set);
578     return NULL;
579   }
580   set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
581                                                    mq_handlers,
582                                                    &handle_client_set_error,
583                                                    set);
584   GNUNET_assert (NULL != set->mq);
585
586   if (NULL == cookie)
587   {
588     LOG (GNUNET_ERROR_TYPE_DEBUG,
589          "Creating new set (operation %u)\n",
590          op);
591     mqm = GNUNET_MQ_msg (create_msg,
592                          GNUNET_MESSAGE_TYPE_SET_CREATE);
593     create_msg->operation = htonl (op);
594   }
595   else
596   {
597     LOG (GNUNET_ERROR_TYPE_DEBUG,
598          "Creating new set (lazy copy)\n",
599          op);
600     mqm = GNUNET_MQ_msg (copy_msg,
601                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
602     copy_msg->cookie = *cookie;
603   }
604   GNUNET_MQ_send (set->mq, mqm);
605   return set;
606 }
607
608
609 /**
610  * Create an empty set, supporting the specified operation.
611  *
612  * @param cfg configuration to use for connecting to the
613  *        set service
614  * @param op operation supported by the set
615  *        Note that the operation has to be specified
616  *        beforehand, as certain set operations need to maintain
617  *        data structures spefific to the operation
618  * @return a handle to the set
619  */
620 struct GNUNET_SET_Handle *
621 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
622                    enum GNUNET_SET_OperationType op)
623 {
624   return create_internal (cfg, op, NULL);
625 }
626
627
628 /**
629  * Add an element to the given set.  After the element has been added
630  * (in the sense of being transmitted to the set service), @a cont
631  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
632  * queued.
633  *
634  * @param set set to add element to
635  * @param element element to add to the set
636  * @param cont continuation called after the element has been added
637  * @param cont_cls closure for @a cont
638  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
639  *         set is invalid (e.g. the set service crashed)
640  */
641 int
642 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
643                         const struct GNUNET_SET_Element *element,
644                         GNUNET_SET_Continuation cont,
645                         void *cont_cls)
646 {
647   struct GNUNET_MQ_Envelope *mqm;
648   struct GNUNET_SET_ElementMessage *msg;
649
650   if (GNUNET_YES == set->invalid)
651   {
652     if (NULL != cont)
653       cont (cont_cls);
654     return GNUNET_SYSERR;
655   }
656   mqm = GNUNET_MQ_msg_extra (msg, element->size,
657                              GNUNET_MESSAGE_TYPE_SET_ADD);
658   msg->element_type = htons (element->element_type);
659   memcpy (&msg[1],
660           element->data,
661           element->size);
662   GNUNET_MQ_notify_sent (mqm,
663                          cont, cont_cls);
664   GNUNET_MQ_send (set->mq, mqm);
665   return GNUNET_OK;
666 }
667
668
669 /**
670  * Remove an element to the given set.  After the element has been
671  * removed (in the sense of the request being transmitted to the set
672  * service), @a cont will be called.  Multiple calls to
673  * GNUNET_SET_remove_element() can be queued
674  *
675  * @param set set to remove element from
676  * @param element element to remove from the set
677  * @param cont continuation called after the element has been removed
678  * @param cont_cls closure for @a cont
679  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
680  *         set is invalid (e.g. the set service crashed)
681  */
682 int
683 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
684                            const struct GNUNET_SET_Element *element,
685                            GNUNET_SET_Continuation cont,
686                            void *cont_cls)
687 {
688   struct GNUNET_MQ_Envelope *mqm;
689   struct GNUNET_SET_ElementMessage *msg;
690
691   if (GNUNET_YES == set->invalid)
692   {
693     if (NULL != cont)
694       cont (cont_cls);
695     return GNUNET_SYSERR;
696   }
697   mqm = GNUNET_MQ_msg_extra (msg,
698                              element->size,
699                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
700   msg->element_type = htons (element->element_type);
701   memcpy (&msg[1],
702           element->data,
703           element->size);
704   GNUNET_MQ_notify_sent (mqm,
705                          cont, cont_cls);
706   GNUNET_MQ_send (set->mq, mqm);
707   return GNUNET_OK;
708 }
709
710
711 /**
712  * Destroy the set handle if no operations are left, mark the set
713  * for destruction otherwise.
714  *
715  * @param set set handle to destroy
716  */
717 void
718 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
719 {
720   /* destroying set while iterator is active is currently
721      not supported; we should expand the API to allow
722      clients to explicitly cancel the iteration! */
723   GNUNET_assert (NULL == set->iterator);
724   if (NULL != set->ops_head)
725   {
726     LOG (GNUNET_ERROR_TYPE_DEBUG,
727          "Set operations are pending, delaying set destruction\n");
728     set->destroy_requested = GNUNET_YES;
729     return;
730   }
731   LOG (GNUNET_ERROR_TYPE_DEBUG,
732        "Really destroying set\n");
733   if (NULL != set->client)
734   {
735     GNUNET_CLIENT_disconnect (set->client);
736     set->client = NULL;
737   }
738   if (NULL != set->mq)
739   {
740     GNUNET_MQ_destroy (set->mq);
741     set->mq = NULL;
742   }
743   GNUNET_free (set);
744 }
745
746
747 /**
748  * Prepare a set operation to be evaluated with another peer.
749  * The evaluation will not start until the client provides
750  * a local set with #GNUNET_SET_commit().
751  *
752  * @param other_peer peer with the other set
753  * @param app_id hash for the application using the set
754  * @param context_msg additional information for the request
755  * @param result_mode specified how results will be returned,
756  *        see `enum GNUNET_SET_ResultMode`.
757  * @param result_cb called on error or success
758  * @param result_cls closure for @e result_cb
759  * @return a handle to cancel the operation
760  */
761 struct GNUNET_SET_OperationHandle *
762 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
763                     const struct GNUNET_HashCode *app_id,
764                     const struct GNUNET_MessageHeader *context_msg,
765                     enum GNUNET_SET_ResultMode result_mode,
766                     GNUNET_SET_ResultIterator result_cb,
767                     void *result_cls)
768 {
769   struct GNUNET_MQ_Envelope *mqm;
770   struct GNUNET_SET_OperationHandle *oh;
771   struct GNUNET_SET_EvaluateMessage *msg;
772
773   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
774   oh->result_cb = result_cb;
775   oh->result_cls = result_cls;
776   mqm = GNUNET_MQ_msg_nested_mh (msg,
777                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
778                                  context_msg);
779   msg->app_id = *app_id;
780   msg->result_mode = htonl (result_mode);
781   msg->target_peer = *other_peer;
782   oh->conclude_mqm = mqm;
783   oh->request_id_addr = &msg->request_id;
784
785   return oh;
786 }
787
788
789 /**
790  * Connect to the set service in order to listen for requests.
791  *
792  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
793  */
794 static void
795 listen_connect (void *cls);
796
797
798 /**
799  * Handle request message for a listen operation
800  *
801  * @param cls the listen handle
802  * @param mh the message
803  */
804 static void
805 handle_request (void *cls,
806                 const struct GNUNET_MessageHeader *mh)
807 {
808   struct GNUNET_SET_ListenHandle *lh = cls;
809   const struct GNUNET_SET_RequestMessage *msg;
810   struct GNUNET_SET_Request req;
811   const struct GNUNET_MessageHeader *context_msg;
812   uint16_t msize;
813   struct GNUNET_MQ_Envelope *mqm;
814   struct GNUNET_SET_RejectMessage *rmsg;
815
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Processing incoming operation request\n");
818   msize = ntohs (mh->size);
819   if (msize < sizeof (struct GNUNET_SET_RequestMessage))
820   {
821     GNUNET_break (0);
822     GNUNET_CLIENT_disconnect (lh->client);
823     lh->client = NULL;
824     GNUNET_MQ_destroy (lh->mq);
825     lh->mq = NULL;
826     lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
827                                                        &listen_connect, lh);
828     lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
829     return;
830   }
831   /* we got another valid request => reset the backoff */
832   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
833   msg = (const struct GNUNET_SET_RequestMessage *) mh;
834   req.accept_id = ntohl (msg->accept_id);
835   req.accepted = GNUNET_NO;
836   context_msg = GNUNET_MQ_extract_nested_mh (msg);
837   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
838   lh->listen_cb (lh->listen_cls,
839                  &msg->peer_id,
840                  context_msg,
841                  &req);
842   if (GNUNET_YES == req.accepted)
843     return; /* the accept-case is handled in #GNUNET_SET_accept() */
844   LOG (GNUNET_ERROR_TYPE_DEBUG,
845        "Rejecting request\n");
846   mqm = GNUNET_MQ_msg (rmsg,
847                        GNUNET_MESSAGE_TYPE_SET_REJECT);
848   rmsg->accept_reject_id = msg->accept_id;
849   GNUNET_MQ_send (lh->mq, mqm);
850 }
851
852
853 /**
854  * Our connection with the set service encountered an error,
855  * re-initialize with exponential back-off.
856  *
857  * @param cls the `struct GNUNET_SET_ListenHandle *`
858  * @param error reason for the disconnect
859  */
860 static void
861 handle_client_listener_error (void *cls,
862                               enum GNUNET_MQ_Error error)
863 {
864   struct GNUNET_SET_ListenHandle *lh = cls;
865
866   LOG (GNUNET_ERROR_TYPE_DEBUG,
867        "Listener broke down (%d), re-connecting\n",
868        (int) error);
869   GNUNET_CLIENT_disconnect (lh->client);
870   lh->client = NULL;
871   GNUNET_MQ_destroy (lh->mq);
872   lh->mq = NULL;
873   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
874                                                      &listen_connect, lh);
875   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
876 }
877
878
879 /**
880  * Connect to the set service in order to listen for requests.
881  *
882  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
883  */
884 static void
885 listen_connect (void *cls)
886 {
887   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
888     { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
889     GNUNET_MQ_HANDLERS_END
890   };
891   struct GNUNET_SET_ListenHandle *lh = cls;
892   struct GNUNET_MQ_Envelope *mqm;
893   struct GNUNET_SET_ListenMessage *msg;
894   const struct GNUNET_SCHEDULER_TaskContext *tc;
895
896   tc = GNUNET_SCHEDULER_get_task_context ();
897   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
898   {
899     LOG (GNUNET_ERROR_TYPE_DEBUG,
900          "Listener not reconnecting due to shutdown\n");
901     return;
902   }
903   lh->reconnect_task = NULL;
904   GNUNET_assert (NULL == lh->client);
905   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
906   if (NULL == lh->client)
907     return;
908   GNUNET_assert (NULL == lh->mq);
909   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
910                                                   mq_handlers,
911                                                   &handle_client_listener_error, lh);
912   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
913   msg->operation = htonl (lh->operation);
914   msg->app_id = lh->app_id;
915   GNUNET_MQ_send (lh->mq, mqm);
916 }
917
918
919 /**
920  * Wait for set operation requests for the given application id
921  *
922  * @param cfg configuration to use for connecting to
923  *            the set service, needs to be valid for the lifetime of the listen handle
924  * @param operation operation we want to listen for
925  * @param app_id id of the application that handles set operation requests
926  * @param listen_cb called for each incoming request matching the operation
927  *                  and application id
928  * @param listen_cls handle for @a listen_cb
929  * @return a handle that can be used to cancel the listen operation
930  */
931 struct GNUNET_SET_ListenHandle *
932 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
933                    enum GNUNET_SET_OperationType operation,
934                    const struct GNUNET_HashCode *app_id,
935                    GNUNET_SET_ListenCallback listen_cb,
936                    void *listen_cls)
937 {
938   struct GNUNET_SET_ListenHandle *lh;
939
940   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
941   lh->listen_cb = listen_cb;
942   lh->listen_cls = listen_cls;
943   lh->cfg = cfg;
944   lh->operation = operation;
945   lh->app_id = *app_id;
946   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
947   listen_connect (lh);
948   if (NULL == lh->client)
949   {
950     GNUNET_free (lh);
951     return NULL;
952   }
953   return lh;
954 }
955
956
957 /**
958  * Cancel the given listen operation.
959  *
960  * @param lh handle for the listen operation
961  */
962 void
963 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
964 {
965   LOG (GNUNET_ERROR_TYPE_DEBUG,
966        "Canceling listener\n");
967   if (NULL != lh->mq)
968   {
969     GNUNET_MQ_destroy (lh->mq);
970     lh->mq = NULL;
971   }
972   if (NULL != lh->client)
973   {
974     GNUNET_CLIENT_disconnect (lh->client);
975     lh->client = NULL;
976   }
977   if (NULL != lh->reconnect_task)
978   {
979     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
980     lh->reconnect_task = NULL;
981   }
982   GNUNET_free (lh);
983 }
984
985
986 /**
987  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
988  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
989  * afterwards.
990  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
991  * and to begin the exchange with the remote peer.
992  *
993  * @param request request to accept
994  * @param result_mode specified how results will be returned,
995  *        see `enum GNUNET_SET_ResultMode`.
996  * @param result_cb callback for the results
997  * @param result_cls closure for @a result_cb
998  * @return a handle to cancel the operation
999  */
1000 struct GNUNET_SET_OperationHandle *
1001 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1002                    enum GNUNET_SET_ResultMode result_mode,
1003                    GNUNET_SET_ResultIterator result_cb,
1004                    void *result_cls)
1005 {
1006   struct GNUNET_MQ_Envelope *mqm;
1007   struct GNUNET_SET_OperationHandle *oh;
1008   struct GNUNET_SET_AcceptMessage *msg;
1009
1010   GNUNET_assert (GNUNET_NO == request->accepted);
1011   request->accepted = GNUNET_YES;
1012   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1013   msg->accept_reject_id = htonl (request->accept_id);
1014   msg->result_mode = htonl (result_mode);
1015   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1016   oh->result_cb = result_cb;
1017   oh->result_cls = result_cls;
1018   oh->conclude_mqm = mqm;
1019   oh->request_id_addr = &msg->request_id;
1020   return oh;
1021 }
1022
1023
1024 /**
1025  * Commit a set to be used with a set operation.
1026  * This function is called once we have fully constructed
1027  * the set that we want to use for the operation.  At this
1028  * time, the P2P protocol can then begin to exchange the
1029  * set information and call the result callback with the
1030  * result information.
1031  *
1032  * @param oh handle to the set operation
1033  * @param set the set to use for the operation
1034  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1035  *         set is invalid (e.g. the set service crashed)
1036  */
1037 int
1038 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1039                    struct GNUNET_SET_Handle *set)
1040 {
1041   if (NULL != oh->set)
1042   {
1043     /* Some other set was already commited for this
1044      * operation, there is a logic bug in the client of this API */
1045     GNUNET_break (0);
1046     return GNUNET_OK;
1047   }
1048   if (GNUNET_YES == set->invalid)
1049     return GNUNET_SYSERR;
1050   GNUNET_assert (NULL != oh->conclude_mqm);
1051   oh->set = set;
1052   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1053                                set->ops_tail,
1054                                oh);
1055   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
1056   *oh->request_id_addr = htonl (oh->request_id);
1057   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
1058   oh->conclude_mqm = NULL;
1059   oh->request_id_addr = NULL;
1060   return GNUNET_OK;
1061 }
1062
1063
1064 /**
1065  * Iterate over all elements in the given set.  Note that this
1066  * operation involves transferring every element of the set from the
1067  * service to the client, and is thus costly.
1068  *
1069  * @param set the set to iterate over
1070  * @param iter the iterator to call for each element
1071  * @param iter_cls closure for @a iter
1072  * @return #GNUNET_YES if the iteration started successfuly,
1073  *         #GNUNET_NO if another iteration is active
1074  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1075  */
1076 int
1077 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1078                     GNUNET_SET_ElementIterator iter,
1079                     void *iter_cls)
1080 {
1081   struct GNUNET_MQ_Envelope *ev;
1082
1083   GNUNET_assert (NULL != iter);
1084   if (GNUNET_YES == set->invalid)
1085     return GNUNET_SYSERR;
1086   if (NULL != set->iterator)
1087     return GNUNET_NO;
1088   LOG (GNUNET_ERROR_TYPE_DEBUG,
1089        "Iterating over set\n");
1090   set->iterator = iter;
1091   set->iterator_cls = iter_cls;
1092   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1093   GNUNET_MQ_send (set->mq, ev);
1094   return GNUNET_YES;
1095 }
1096
1097
1098 void
1099 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1100                       GNUNET_SET_CopyReadyCallback cb,
1101                       void *cls)
1102 {
1103   struct GNUNET_MQ_Envelope *ev;
1104   struct SetCopyRequest *req;
1105
1106   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1107   GNUNET_MQ_send (set->mq, ev);
1108
1109   req = GNUNET_new (struct SetCopyRequest);
1110   req->cb = cb;
1111   req->cls = cls;
1112   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1113                                set->copy_req_tail,
1114                                req);
1115 }
1116
1117
1118 /**
1119  * Create a copy of an element.  The copy
1120  * must be GNUNET_free-d by the caller.
1121  *
1122  * @param element the element to copy
1123  * @return the copied element
1124  */
1125 struct GNUNET_SET_Element *
1126 GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
1127 {
1128   struct GNUNET_SET_Element *copy;
1129
1130   copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
1131   copy->size = element->size;
1132   copy->element_type = element->element_type;
1133   copy->data = &copy[1];
1134   memcpy ((void *) copy->data, element->data, copy->size);
1135
1136   return copy;
1137 }
1138
1139
1140 /**
1141  * Hash a set element.
1142  *
1143  * @param element the element that should be hashed
1144  * @param ret_hash a pointer to where the hash of @a element
1145  *        should be stored
1146  */
1147 void
1148 GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
1149 {
1150   struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
1151
1152   /* It's not guaranteed that the element data is always after the element header,
1153      so we need to hash the chunks separately. */
1154   GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
1155   GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
1156   GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
1157   GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
1158 }
1159
1160 /* end of set_api.c */