-indentation and comment fixes
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_set_service.h"
31 #include "set.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35
36 /**
37  * Opaque handle to a set.
38  */
39 struct GNUNET_SET_Handle
40 {
41   /**
42    * Client connected to the set service.
43    */
44   struct GNUNET_CLIENT_Connection *client;
45
46   /**
47    * Message queue for @e client.
48    */
49   struct GNUNET_MQ_Handle *mq;
50
51   /**
52    * Linked list of operations on the set.
53    */
54   struct GNUNET_SET_OperationHandle *ops_head;
55
56   /**
57    * Linked list of operations on the set.
58    */
59   struct GNUNET_SET_OperationHandle *ops_tail;
60
61   /**
62    * Callback for the current iteration over the set,
63    * NULL if no iterator is active.
64    */
65   GNUNET_SET_ElementIterator iterator;
66
67   /**
68    * Closure for @e iterator
69    */
70   void *iterator_cls;
71
72   /**
73    * Should the set be destroyed once all operations are gone?
74    */
75   int destroy_requested;
76
77   /**
78    * Has the set become invalid (e.g. service died)?
79    */
80   int invalid;
81
82   /**
83    * Both client and service count the number of iterators
84    * created so far to match replies with iterators.
85    */
86   uint16_t iteration_id;
87 };
88
89
90 /**
91  * Handle for a set operation request from another peer.
92  */
93 struct GNUNET_SET_Request
94 {
95   /**
96    * Id of the request, used to identify the request when
97    * accepting/rejecting it.
98    */
99   uint32_t accept_id;
100
101   /**
102    * Has the request been accepted already?
103    * #GNUNET_YES/#GNUNET_NO
104    */
105   int accepted;
106 };
107
108
109 /**
110  * Handle to an operation.  Only known to the service after committing
111  * the handle with a set.
112  */
113 struct GNUNET_SET_OperationHandle
114 {
115   /**
116    * Function to be called when we have a result,
117    * or an error.
118    */
119   GNUNET_SET_ResultIterator result_cb;
120
121   /**
122    * Closure for @e result_cb.
123    */
124   void *result_cls;
125
126   /**
127    * Local set used for the operation,
128    * NULL if no set has been provided by conclude yet.
129    */
130   struct GNUNET_SET_Handle *set;
131
132   /**
133    * Message sent to the server on calling conclude,
134    * NULL if conclude has been called.
135    */
136   struct GNUNET_MQ_Envelope *conclude_mqm;
137
138   /**
139    * Address of the request if in the conclude message,
140    * used to patch the request id into the message when the set is known.
141    */
142   uint32_t *request_id_addr;
143
144   /**
145    * Handles are kept in a linked list.
146    */
147   struct GNUNET_SET_OperationHandle *prev;
148
149   /**
150    * Handles are kept in a linked list.
151    */
152   struct GNUNET_SET_OperationHandle *next;
153
154   /**
155    * Request ID to identify the operation within the set.
156    */
157   uint32_t request_id;
158 };
159
160
161 /**
162  * Opaque handle to a listen operation.
163  */
164 struct GNUNET_SET_ListenHandle
165 {
166   /**
167    * Connection to the service.
168    */
169   struct GNUNET_CLIENT_Connection *client;
170
171   /**
172    * Message queue for the client.
173    */
174   struct GNUNET_MQ_Handle* mq;
175
176   /**
177    * Configuration handle for the listener, stored
178    * here to be able to reconnect transparently on
179    * connection failure.
180    */
181   const struct GNUNET_CONFIGURATION_Handle *cfg;
182
183   /**
184    * Function to call on a new incoming request,
185    * or on error.
186    */
187   GNUNET_SET_ListenCallback listen_cb;
188
189   /**
190    * Closure for @e listen_cb.
191    */
192   void *listen_cls;
193
194   /**
195    * Application ID we listen for.
196    */
197   struct GNUNET_HashCode app_id;
198
199   /**
200    * Time to wait until we try to reconnect on failure.
201    */
202   struct GNUNET_TIME_Relative reconnect_backoff;
203
204   /**
205    * Task for reconnecting when the listener fails.
206    */
207   struct GNUNET_SCHEDULER_Task * reconnect_task;
208
209   /**
210    * Operation we listen for.
211    */
212   enum GNUNET_SET_OperationType operation;
213 };
214
215
216 /**
217  * Handle element for iteration over the set.  Notifies the
218  * iterator and sends an acknowledgement to the service.
219  *
220  * @param cls the `struct GNUNET_SET_Handle *`
221  * @param mh the message
222  */
223 static void
224 handle_iter_element (void *cls,
225                      const struct GNUNET_MessageHeader *mh)
226 {
227   struct GNUNET_SET_Handle *set = cls;
228   GNUNET_SET_ElementIterator iter = set->iterator;
229   struct GNUNET_SET_Element element;
230   const struct GNUNET_SET_IterResponseMessage *msg;
231   struct GNUNET_SET_IterAckMessage *ack_msg;
232   struct GNUNET_MQ_Envelope *ev;
233   uint16_t msize;
234
235   msize = ntohs (mh->size);
236   if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
237   {
238     /* message malformed */
239     GNUNET_break (0);
240     set->iterator = NULL;
241     set->iteration_id++;
242     iter (set->iterator_cls,
243           NULL);
244     iter = NULL;
245   }
246   msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
247   if (set->iteration_id != ntohs (msg->iteration_id))
248   {
249     /* element from a previous iteration, skip! */
250     iter = NULL;
251   }
252   if (NULL != iter)
253   {
254     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
255     element.element_type = htons (msg->element_type);
256     element.data = &msg[1];
257     iter (set->iterator_cls,
258           &element);
259   }
260   ev = GNUNET_MQ_msg (ack_msg,
261                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
262   ack_msg->send_more = htonl ((NULL != iter));
263   GNUNET_MQ_send (set->mq, ev);
264 }
265
266
267 /**
268  * Handle message signalling conclusion of iteration over the set.
269  * Notifies the iterator that we are done.
270  *
271  * @param cls the set
272  * @param mh the message
273  */
274 static void
275 handle_iter_done (void *cls,
276                   const struct GNUNET_MessageHeader *mh)
277 {
278   struct GNUNET_SET_Handle *set = cls;
279   GNUNET_SET_ElementIterator iter = set->iterator;
280
281   if (NULL == iter)
282     return;
283   set->iterator = NULL;
284   set->iteration_id++;
285   iter (set->iterator_cls,
286         NULL);
287 }
288
289
290 /**
291  * Handle result message for a set operation.
292  *
293  * @param cls the set
294  * @param mh the message
295  */
296 static void
297 handle_result (void *cls,
298                const struct GNUNET_MessageHeader *mh)
299 {
300   struct GNUNET_SET_Handle *set = cls;
301   const struct GNUNET_SET_ResultMessage *msg;
302   struct GNUNET_SET_OperationHandle *oh;
303   struct GNUNET_SET_Element e;
304   enum GNUNET_SET_Status result_status;
305
306   msg = (const struct GNUNET_SET_ResultMessage *) mh;
307   GNUNET_assert (NULL != set->mq);
308   result_status = ntohs (msg->result_status);
309   LOG (GNUNET_ERROR_TYPE_DEBUG,
310        "Got result message with status %d\n",
311        result_status);
312
313   oh = GNUNET_MQ_assoc_get (set->mq,
314                             ntohl (msg->request_id));
315   if (NULL == oh)
316   {
317     /* 'oh' can be NULL if we canceled the operation, but the service
318        did not get the cancel message yet. */
319     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
320                 "Ignoring result from canceled operation\n");
321     return;
322   }
323   if (GNUNET_SET_STATUS_OK != result_status)
324   {
325     /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
326      * and this is the last result message we get */
327     GNUNET_MQ_assoc_remove (set->mq,
328                             ntohl (msg->request_id));
329     GNUNET_CONTAINER_DLL_remove (set->ops_head,
330                                  set->ops_tail,
331                                  oh);
332     if ( (GNUNET_YES == set->destroy_requested) &&
333          (NULL == set->ops_head) )
334       GNUNET_SET_destroy (set);
335     if (NULL != oh->result_cb)
336       oh->result_cb (oh->result_cls,
337                      NULL,
338                      result_status);
339     switch (result_status)
340     {
341     case GNUNET_SET_STATUS_OK:
342       break;
343     case GNUNET_SET_STATUS_FAILURE:
344       oh->result_cb = NULL;
345       break;
346     case GNUNET_SET_STATUS_HALF_DONE:
347       break;
348     case GNUNET_SET_STATUS_DONE:
349       oh->result_cb = NULL;
350       break;
351     }
352     GNUNET_free (oh);
353     return;
354   }
355   e.data = &msg[1];
356   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
357   e.element_type = msg->element_type;
358   if (NULL != oh->result_cb)
359     oh->result_cb (oh->result_cls,
360                    &e,
361                    result_status);
362 }
363
364
365 /**
366  * Destroy the given set operation.
367  *
368  * @param oh set operation to destroy
369  */
370 static void
371 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
372 {
373   struct GNUNET_SET_Handle *set = oh->set;
374   struct GNUNET_SET_OperationHandle *h_assoc;
375
376   if (NULL != oh->conclude_mqm)
377     GNUNET_MQ_discard (oh->conclude_mqm);
378   /* is the operation already commited? */
379   if (NULL != set)
380   {
381     GNUNET_CONTAINER_DLL_remove (set->ops_head,
382                                  set->ops_tail,
383                                  oh);
384     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
385                                       oh->request_id);
386     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
387   }
388   GNUNET_free (oh);
389 }
390
391
392 /**
393  * Cancel the given set operation.  We need to send an explicit cancel
394  * message, as all operations one one set communicate using one
395  * handle.
396  *
397  * @param oh set operation to cancel
398  */
399 void
400 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
401 {
402   struct GNUNET_SET_Handle *set = oh->set;
403   struct GNUNET_SET_CancelMessage *m;
404   struct GNUNET_MQ_Envelope *mqm;
405
406   if (NULL != set)
407   {
408     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
409     m->request_id = htonl (oh->request_id);
410     GNUNET_MQ_send (set->mq, mqm);
411   }
412   set_operation_destroy (oh);
413   if ( (NULL != set) &&
414        (GNUNET_YES == set->destroy_requested) &&
415        (NULL == set->ops_head) )
416   {
417     LOG (GNUNET_ERROR_TYPE_DEBUG,
418          "Destroying set after operation cancel\n");
419     GNUNET_SET_destroy (set);
420   }
421 }
422
423
424 /**
425  * We encountered an error communicating with the set service while
426  * performing a set operation. Report to the application.
427  *
428  * @param cls the `struct GNUNET_SET_Handle`
429  * @param error error code
430  */
431 static void
432 handle_client_set_error (void *cls,
433                          enum GNUNET_MQ_Error error)
434 {
435   struct GNUNET_SET_Handle *set = cls;
436
437   LOG (GNUNET_ERROR_TYPE_DEBUG,
438        "Handling client set error %d\n",
439        error);
440   while (NULL != set->ops_head)
441   {
442     if (NULL != set->ops_head->result_cb)
443       set->ops_head->result_cb (set->ops_head->result_cls,
444                                 NULL,
445                                 GNUNET_SET_STATUS_FAILURE);
446     set_operation_destroy (set->ops_head);
447   }
448   set->invalid = GNUNET_YES;
449   if (GNUNET_YES == set->destroy_requested)
450   {
451     LOG (GNUNET_ERROR_TYPE_DEBUG,
452          "Destroying set after operation failure\n");
453     GNUNET_SET_destroy (set);
454   }
455 }
456
457
458 /**
459  * Create an empty set, supporting the specified operation.
460  *
461  * @param cfg configuration to use for connecting to the
462  *        set service
463  * @param op operation supported by the set
464  *        Note that the operation has to be specified
465  *        beforehand, as certain set operations need to maintain
466  *        data structures spefific to the operation
467  * @return a handle to the set
468  */
469 struct GNUNET_SET_Handle *
470 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
471                    enum GNUNET_SET_OperationType op)
472 {
473   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
474     { &handle_result,
475       GNUNET_MESSAGE_TYPE_SET_RESULT,
476       0 },
477     { &handle_iter_element,
478       GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
479       0 },
480     { &handle_iter_done,
481       GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
482       sizeof (struct GNUNET_MessageHeader) },
483     GNUNET_MQ_HANDLERS_END
484   };
485   struct GNUNET_SET_Handle *set;
486   struct GNUNET_MQ_Envelope *mqm;
487   struct GNUNET_SET_CreateMessage *msg;
488
489   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490               "Creating new set (operation %u)\n",
491               op);
492   set = GNUNET_new (struct GNUNET_SET_Handle);
493   set->client = GNUNET_CLIENT_connect ("set", cfg);
494   if (NULL == set->client)
495   {
496     GNUNET_free (set);
497     return NULL;
498   }
499   set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
500                                                    mq_handlers,
501                                                    &handle_client_set_error,
502                                                    set);
503   GNUNET_assert (NULL != set->mq);
504   mqm = GNUNET_MQ_msg (msg,
505                        GNUNET_MESSAGE_TYPE_SET_CREATE);
506   msg->operation = htonl (op);
507   GNUNET_MQ_send (set->mq, mqm);
508   return set;
509 }
510
511
512 /**
513  * Add an element to the given set.  After the element has been added
514  * (in the sense of being transmitted to the set service), @a cont
515  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
516  * queued.
517  *
518  * @param set set to add element to
519  * @param element element to add to the set
520  * @param cont continuation called after the element has been added
521  * @param cont_cls closure for @a cont
522  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
523  *         set is invalid (e.g. the set service crashed)
524  */
525 int
526 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
527                         const struct GNUNET_SET_Element *element,
528                         GNUNET_SET_Continuation cont,
529                         void *cont_cls)
530 {
531   struct GNUNET_MQ_Envelope *mqm;
532   struct GNUNET_SET_ElementMessage *msg;
533
534   if (GNUNET_YES == set->invalid)
535   {
536     if (NULL != cont)
537       cont (cont_cls);
538     return GNUNET_SYSERR;
539   }
540   mqm = GNUNET_MQ_msg_extra (msg, element->size,
541                              GNUNET_MESSAGE_TYPE_SET_ADD);
542   msg->element_type = element->element_type;
543   memcpy (&msg[1],
544           element->data,
545           element->size);
546   GNUNET_MQ_notify_sent (mqm,
547                          cont, cont_cls);
548   GNUNET_MQ_send (set->mq, mqm);
549   return GNUNET_OK;
550 }
551
552
553 /**
554  * Remove an element to the given set.  After the element has been
555  * removed (in the sense of the request being transmitted to the set
556  * service), @a cont will be called.  Multiple calls to
557  * GNUNET_SET_remove_element() can be queued
558  *
559  * @param set set to remove element from
560  * @param element element to remove from the set
561  * @param cont continuation called after the element has been removed
562  * @param cont_cls closure for @a cont
563  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
564  *         set is invalid (e.g. the set service crashed)
565  */
566 int
567 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
568                            const struct GNUNET_SET_Element *element,
569                            GNUNET_SET_Continuation cont,
570                            void *cont_cls)
571 {
572   struct GNUNET_MQ_Envelope *mqm;
573   struct GNUNET_SET_ElementMessage *msg;
574
575   if (GNUNET_YES == set->invalid)
576   {
577     if (NULL != cont)
578       cont (cont_cls);
579     return GNUNET_SYSERR;
580   }
581   mqm = GNUNET_MQ_msg_extra (msg,
582                              element->size,
583                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
584   msg->element_type = element->element_type;
585   memcpy (&msg[1],
586           element->data,
587           element->size);
588   GNUNET_MQ_notify_sent (mqm,
589                          cont, cont_cls);
590   GNUNET_MQ_send (set->mq, mqm);
591   return GNUNET_OK;
592 }
593
594
595 /**
596  * Destroy the set handle if no operations are left, mark the set
597  * for destruction otherwise.
598  *
599  * @param set set handle to destroy
600  */
601 void
602 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
603 {
604   /* destroying set while iterator is active is currently
605      not supported; we should expand the API to allow
606      clients to explicitly cancel the iteration! */
607   GNUNET_assert (NULL == set->iterator);
608   if (NULL != set->ops_head)
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG,
611          "Set operations are pending, delaying set destruction\n");
612     set->destroy_requested = GNUNET_YES;
613     return;
614   }
615   LOG (GNUNET_ERROR_TYPE_DEBUG,
616        "Really destroying set\n");
617   if (NULL != set->client)
618   {
619     GNUNET_CLIENT_disconnect (set->client);
620     set->client = NULL;
621   }
622   if (NULL != set->mq)
623   {
624     GNUNET_MQ_destroy (set->mq);
625     set->mq = NULL;
626   }
627   GNUNET_free (set);
628 }
629
630
631 /**
632  * Prepare a set operation to be evaluated with another peer.
633  * The evaluation will not start until the client provides
634  * a local set with #GNUNET_SET_commit().
635  *
636  * @param other_peer peer with the other set
637  * @param app_id hash for the application using the set
638  * @param context_msg additional information for the request
639  * @param result_mode specified how results will be returned,
640  *        see `enum GNUNET_SET_ResultMode`.
641  * @param result_cb called on error or success
642  * @param result_cls closure for @e result_cb
643  * @return a handle to cancel the operation
644  */
645 struct GNUNET_SET_OperationHandle *
646 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
647                     const struct GNUNET_HashCode *app_id,
648                     const struct GNUNET_MessageHeader *context_msg,
649                     enum GNUNET_SET_ResultMode result_mode,
650                     GNUNET_SET_ResultIterator result_cb,
651                     void *result_cls)
652 {
653   struct GNUNET_MQ_Envelope *mqm;
654   struct GNUNET_SET_OperationHandle *oh;
655   struct GNUNET_SET_EvaluateMessage *msg;
656
657   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
658   oh->result_cb = result_cb;
659   oh->result_cls = result_cls;
660   mqm = GNUNET_MQ_msg_nested_mh (msg,
661                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
662                                  context_msg);
663   msg->app_id = *app_id;
664   msg->result_mode = htonl (result_mode);
665   msg->target_peer = *other_peer;
666   oh->conclude_mqm = mqm;
667   oh->request_id_addr = &msg->request_id;
668
669   return oh;
670 }
671
672
673 /**
674  * Connect to the set service in order to listen for requests.
675  *
676  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
677  * @param tc task context if invoked as a task, NULL otherwise
678  */
679 static void
680 listen_connect (void *cls,
681                 const struct GNUNET_SCHEDULER_TaskContext *tc);
682
683
684 /**
685  * Handle request message for a listen operation
686  *
687  * @param cls the listen handle
688  * @param mh the message
689  */
690 static void
691 handle_request (void *cls,
692                 const struct GNUNET_MessageHeader *mh)
693 {
694   struct GNUNET_SET_ListenHandle *lh = cls;
695   const struct GNUNET_SET_RequestMessage *msg;
696   struct GNUNET_SET_Request req;
697   const struct GNUNET_MessageHeader *context_msg;
698   uint16_t msize;
699   struct GNUNET_MQ_Envelope *mqm;
700   struct GNUNET_SET_RejectMessage *rmsg;
701
702   LOG (GNUNET_ERROR_TYPE_DEBUG,
703        "Processing incoming operation request\n");
704   msize = ntohs (mh->size);
705   if (msize < sizeof (struct GNUNET_SET_RequestMessage))
706   {
707     GNUNET_break (0);
708     GNUNET_CLIENT_disconnect (lh->client);
709     lh->client = NULL;
710     GNUNET_MQ_destroy (lh->mq);
711     lh->mq = NULL;
712     lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
713                                                        &listen_connect, lh);
714     lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
715     return;
716   }
717   /* we got another valid request => reset the backoff */
718   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
719   msg = (const struct GNUNET_SET_RequestMessage *) mh;
720   req.accept_id = ntohl (msg->accept_id);
721   req.accepted = GNUNET_NO;
722   context_msg = GNUNET_MQ_extract_nested_mh (msg);
723   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
724   lh->listen_cb (lh->listen_cls,
725                  &msg->peer_id,
726                  context_msg,
727                  &req);
728   if (GNUNET_YES == req.accepted)
729     return; /* the accept-case is handled in #GNUNET_SET_accept() */
730   LOG (GNUNET_ERROR_TYPE_DEBUG,
731        "Rejecting request\n");
732   mqm = GNUNET_MQ_msg (rmsg,
733                        GNUNET_MESSAGE_TYPE_SET_REJECT);
734   rmsg->accept_reject_id = msg->accept_id;
735   GNUNET_MQ_send (lh->mq, mqm);
736 }
737
738
739 /**
740  * Our connection with the set service encountered an error,
741  * re-initialize with exponential back-off.
742  *
743  * @param cls the `struct GNUNET_SET_ListenHandle *`
744  * @param error reason for the disconnect
745  */
746 static void
747 handle_client_listener_error (void *cls,
748                               enum GNUNET_MQ_Error error)
749 {
750   struct GNUNET_SET_ListenHandle *lh = cls;
751
752   LOG (GNUNET_ERROR_TYPE_DEBUG,
753        "Listener broke down (%d), re-connecting\n",
754        (int) error);
755   GNUNET_CLIENT_disconnect (lh->client);
756   lh->client = NULL;
757   GNUNET_MQ_destroy (lh->mq);
758   lh->mq = NULL;
759   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
760                                                      &listen_connect, lh);
761   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
762 }
763
764
765 /**
766  * Connect to the set service in order to listen for requests.
767  *
768  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
769  * @param tc task context if invoked as a task, NULL otherwise
770  */
771 static void
772 listen_connect (void *cls,
773                 const struct GNUNET_SCHEDULER_TaskContext *tc)
774 {
775   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
776     { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
777     GNUNET_MQ_HANDLERS_END
778   };
779   struct GNUNET_SET_ListenHandle *lh = cls;
780   struct GNUNET_MQ_Envelope *mqm;
781   struct GNUNET_SET_ListenMessage *msg;
782
783   if ( (NULL != tc) &&
784        (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
785   {
786     LOG (GNUNET_ERROR_TYPE_DEBUG,
787          "Listener not reconnecting due to shutdown\n");
788     return;
789   }
790   lh->reconnect_task = NULL;
791   GNUNET_assert (NULL == lh->client);
792   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
793   if (NULL == lh->client)
794     return;
795   GNUNET_assert (NULL == lh->mq);
796   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
797                                                   mq_handlers,
798                                                   &handle_client_listener_error, lh);
799   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
800   msg->operation = htonl (lh->operation);
801   msg->app_id = lh->app_id;
802   GNUNET_MQ_send (lh->mq, mqm);
803 }
804
805
806 /**
807  * Wait for set operation requests for the given application id
808  *
809  * @param cfg configuration to use for connecting to
810  *            the set service, needs to be valid for the lifetime of the listen handle
811  * @param operation operation we want to listen for
812  * @param app_id id of the application that handles set operation requests
813  * @param listen_cb called for each incoming request matching the operation
814  *                  and application id
815  * @param listen_cls handle for @a listen_cb
816  * @return a handle that can be used to cancel the listen operation
817  */
818 struct GNUNET_SET_ListenHandle *
819 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
820                    enum GNUNET_SET_OperationType operation,
821                    const struct GNUNET_HashCode *app_id,
822                    GNUNET_SET_ListenCallback listen_cb,
823                    void *listen_cls)
824 {
825   struct GNUNET_SET_ListenHandle *lh;
826
827   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
828   lh->listen_cb = listen_cb;
829   lh->listen_cls = listen_cls;
830   lh->cfg = cfg;
831   lh->operation = operation;
832   lh->app_id = *app_id;
833   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
834   listen_connect (lh, NULL);
835   if (NULL == lh->client)
836   {
837     GNUNET_free (lh);
838     return NULL;
839   }
840   return lh;
841 }
842
843
844 /**
845  * Cancel the given listen operation.
846  *
847  * @param lh handle for the listen operation
848  */
849 void
850 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
851 {
852   LOG (GNUNET_ERROR_TYPE_DEBUG,
853        "Canceling listener\n");
854   if (NULL != lh->mq)
855   {
856     GNUNET_MQ_destroy (lh->mq);
857     lh->mq = NULL;
858   }
859   if (NULL != lh->client)
860   {
861     GNUNET_CLIENT_disconnect (lh->client);
862     lh->client = NULL;
863   }
864   if (NULL != lh->reconnect_task)
865   {
866     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
867     lh->reconnect_task = NULL;
868   }
869   GNUNET_free (lh);
870 }
871
872
873 /**
874  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
875  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
876  * afterwards.
877  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
878  * and to begin the exchange with the remote peer.
879  *
880  * @param request request to accept
881  * @param result_mode specified how results will be returned,
882  *        see `enum GNUNET_SET_ResultMode`.
883  * @param result_cb callback for the results
884  * @param result_cls closure for @a result_cb
885  * @return a handle to cancel the operation
886  */
887 struct GNUNET_SET_OperationHandle *
888 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
889                    enum GNUNET_SET_ResultMode result_mode,
890                    GNUNET_SET_ResultIterator result_cb,
891                    void *result_cls)
892 {
893   struct GNUNET_MQ_Envelope *mqm;
894   struct GNUNET_SET_OperationHandle *oh;
895   struct GNUNET_SET_AcceptMessage *msg;
896
897   GNUNET_assert (GNUNET_NO == request->accepted);
898   request->accepted = GNUNET_YES;
899   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
900   msg->accept_reject_id = htonl (request->accept_id);
901   msg->result_mode = htonl (result_mode);
902   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
903   oh->result_cb = result_cb;
904   oh->result_cls = result_cls;
905   oh->conclude_mqm = mqm;
906   oh->request_id_addr = &msg->request_id;
907   return oh;
908 }
909
910
911 /**
912  * Commit a set to be used with a set operation.
913  * This function is called once we have fully constructed
914  * the set that we want to use for the operation.  At this
915  * time, the P2P protocol can then begin to exchange the
916  * set information and call the result callback with the
917  * result information.
918  *
919  * @param oh handle to the set operation
920  * @param set the set to use for the operation
921  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
922  *         set is invalid (e.g. the set service crashed)
923  */
924 int
925 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
926                    struct GNUNET_SET_Handle *set)
927 {
928   GNUNET_assert (NULL == oh->set);
929   if (GNUNET_YES == set->invalid)
930     return GNUNET_SYSERR;
931   GNUNET_assert (NULL != oh->conclude_mqm);
932   oh->set = set;
933   GNUNET_CONTAINER_DLL_insert (set->ops_head,
934                                set->ops_tail,
935                                oh);
936   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
937   *oh->request_id_addr = htonl (oh->request_id);
938   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
939   oh->conclude_mqm = NULL;
940   oh->request_id_addr = NULL;
941   return GNUNET_OK;
942 }
943
944
945 /**
946  * Iterate over all elements in the given set.  Note that this
947  * operation involves transferring every element of the set from the
948  * service to the client, and is thus costly.
949  *
950  * @param set the set to iterate over
951  * @param iter the iterator to call for each element
952  * @param iter_cls closure for @a iter
953  * @return #GNUNET_YES if the iteration started successfuly,
954  *         #GNUNET_NO if another iteration is active
955  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
956  */
957 int
958 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
959                     GNUNET_SET_ElementIterator iter,
960                     void *iter_cls)
961 {
962   struct GNUNET_MQ_Envelope *ev;
963
964   GNUNET_assert (NULL != iter);
965   if (GNUNET_YES == set->invalid)
966     return GNUNET_SYSERR;
967   if (NULL != set->iterator)
968     return GNUNET_NO;
969   LOG (GNUNET_ERROR_TYPE_DEBUG,
970        "Iterating over set\n");
971   set->iterator = iter;
972   set->iterator_cls = iter_cls;
973   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
974   GNUNET_MQ_send (set->mq, ev);
975   return GNUNET_YES;
976 }
977
978
979 /**
980  * Stop iteration over all elements in the given set.  Can only
981  * be called before the iteration has "naturally" completed its
982  * turn.
983  *
984  * @param set the set to stop iterating over
985  */
986 void
987 GNUNET_SET_iterate_cancel (struct GNUNET_SET_Handle *set)
988 {
989   GNUNET_assert (NULL != set->iterator);
990   set->iterator = NULL;
991   set->iteration_id++;
992 }
993
994
995 /* end of set_api.c */