390d35e5fe7273e04893b1aa424e26e5cda106eb
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_set_service.h"
31 #include "set.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35
36 /**
37  * Opaque handle to a set.
38  */
39 struct GNUNET_SET_Handle
40 {
41   /**
42    * Client connected to the set service.
43    */
44   struct GNUNET_CLIENT_Connection *client;
45
46   /**
47    * Message queue for @e client.
48    */
49   struct GNUNET_MQ_Handle *mq;
50
51   /**
52    * Linked list of operations on the set.
53    */
54   struct GNUNET_SET_OperationHandle *ops_head;
55
56   /**
57    * Linked list of operations on the set.
58    */
59   struct GNUNET_SET_OperationHandle *ops_tail;
60
61   /**
62    * Callback for the current iteration over the set,
63    * NULL if no iterator is active.
64    */
65   GNUNET_SET_ElementIterator iterator;
66
67   /**
68    * Closure for @e iterator
69    */
70   void *iterator_cls;
71
72   /**
73    * Should the set be destroyed once all operations are gone?
74    */
75   int destroy_requested;
76
77   /**
78    * Has the set become invalid (e.g. service died)?
79    */
80   int invalid;
81
82   /**
83    * Both client and service count the number of iterators
84    * created so far to match replies with iterators.
85    */
86   uint16_t iteration_id;
87 };
88
89
90 /**
91  * Handle for a set operation request from another peer.
92  */
93 struct GNUNET_SET_Request
94 {
95   /**
96    * Id of the request, used to identify the request when
97    * accepting/rejecting it.
98    */
99   uint32_t accept_id;
100
101   /**
102    * Has the request been accepted already?
103    * #GNUNET_YES/#GNUNET_NO
104    */
105   int accepted;
106 };
107
108
109 /**
110  * Handle to an operation.  Only known to the service after committing
111  * the handle with a set.
112  */
113 struct GNUNET_SET_OperationHandle
114 {
115   /**
116    * Function to be called when we have a result,
117    * or an error.
118    */
119   GNUNET_SET_ResultIterator result_cb;
120
121   /**
122    * Closure for @e result_cb.
123    */
124   void *result_cls;
125
126   /**
127    * Local set used for the operation,
128    * NULL if no set has been provided by conclude yet.
129    */
130   struct GNUNET_SET_Handle *set;
131
132   /**
133    * Message sent to the server on calling conclude,
134    * NULL if conclude has been called.
135    */
136   struct GNUNET_MQ_Envelope *conclude_mqm;
137
138   /**
139    * Address of the request if in the conclude message,
140    * used to patch the request id into the message when the set is known.
141    */
142   uint32_t *request_id_addr;
143
144   /**
145    * Handles are kept in a linked list.
146    */
147   struct GNUNET_SET_OperationHandle *prev;
148
149   /**
150    * Handles are kept in a linked list.
151    */
152   struct GNUNET_SET_OperationHandle *next;
153
154   /**
155    * Request ID to identify the operation within the set.
156    */
157   uint32_t request_id;
158 };
159
160
161 /**
162  * Opaque handle to a listen operation.
163  */
164 struct GNUNET_SET_ListenHandle
165 {
166   /**
167    * Connection to the service.
168    */
169   struct GNUNET_CLIENT_Connection *client;
170
171   /**
172    * Message queue for the client.
173    */
174   struct GNUNET_MQ_Handle* mq;
175
176   /**
177    * Configuration handle for the listener, stored
178    * here to be able to reconnect transparently on
179    * connection failure.
180    */
181   const struct GNUNET_CONFIGURATION_Handle *cfg;
182
183   /**
184    * Function to call on a new incoming request,
185    * or on error.
186    */
187   GNUNET_SET_ListenCallback listen_cb;
188
189   /**
190    * Closure for @e listen_cb.
191    */
192   void *listen_cls;
193
194   /**
195    * Application ID we listen for.
196    */
197   struct GNUNET_HashCode app_id;
198
199   /**
200    * Time to wait until we try to reconnect on failure.
201    */
202   struct GNUNET_TIME_Relative reconnect_backoff;
203
204   /**
205    * Task for reconnecting when the listener fails.
206    */
207   struct GNUNET_SCHEDULER_Task * reconnect_task;
208
209   /**
210    * Operation we listen for.
211    */
212   enum GNUNET_SET_OperationType operation;
213 };
214
215
216 /**
217  * Handle element for iteration over the set.  Notifies the
218  * iterator and sends an acknowledgement to the service.
219  *
220  * @param cls the `struct GNUNET_SET_Handle *`
221  * @param mh the message
222  */
223 static void
224 handle_iter_element (void *cls,
225                      const struct GNUNET_MessageHeader *mh)
226 {
227   struct GNUNET_SET_Handle *set = cls;
228   GNUNET_SET_ElementIterator iter = set->iterator;
229   struct GNUNET_SET_Element element;
230   const struct GNUNET_SET_IterResponseMessage *msg;
231   struct GNUNET_SET_IterAckMessage *ack_msg;
232   struct GNUNET_MQ_Envelope *ev;
233   uint16_t msize;
234
235   msize = ntohs (mh->size);
236   if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
237   {
238     /* message malformed */
239     GNUNET_break (0);
240     set->iterator = NULL;
241     set->iteration_id++;
242     iter (set->iterator_cls,
243           NULL);
244     iter = NULL;
245   }
246   msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
247   if (set->iteration_id != ntohs (msg->iteration_id))
248   {
249     /* element from a previous iteration, skip! */
250     iter = NULL;
251   }
252   if (NULL != iter)
253   {
254     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
255     element.element_type = htons (msg->element_type);
256     element.data = &msg[1];
257     iter (set->iterator_cls,
258           &element);
259   }
260   ev = GNUNET_MQ_msg (ack_msg,
261                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
262   ack_msg->send_more = htonl ((NULL != iter));
263   GNUNET_MQ_send (set->mq, ev);
264 }
265
266
267 /**
268  * Handle message signalling conclusion of iteration over the set.
269  * Notifies the iterator that we are done.
270  *
271  * @param cls the set
272  * @param mh the message
273  */
274 static void
275 handle_iter_done (void *cls,
276                   const struct GNUNET_MessageHeader *mh)
277 {
278   struct GNUNET_SET_Handle *set = cls;
279   GNUNET_SET_ElementIterator iter = set->iterator;
280
281   if (NULL == iter)
282     return;
283   set->iterator = NULL;
284   set->iteration_id++;
285   iter (set->iterator_cls,
286         NULL);
287 }
288
289
290 /**
291  * Handle result message for a set operation.
292  *
293  * @param cls the set
294  * @param mh the message
295  */
296 static void
297 handle_result (void *cls,
298                const struct GNUNET_MessageHeader *mh)
299 {
300   struct GNUNET_SET_Handle *set = cls;
301   const struct GNUNET_SET_ResultMessage *msg;
302   struct GNUNET_SET_OperationHandle *oh;
303   struct GNUNET_SET_Element e;
304   enum GNUNET_SET_Status result_status;
305
306   msg = (const struct GNUNET_SET_ResultMessage *) mh;
307   GNUNET_assert (NULL != set->mq);
308   result_status = ntohs (msg->result_status);
309   LOG (GNUNET_ERROR_TYPE_DEBUG,
310        "Got result message with status %d\n",
311        result_status);
312
313   oh = GNUNET_MQ_assoc_get (set->mq,
314                             ntohl (msg->request_id));
315   if (NULL == oh)
316   {
317     /* 'oh' can be NULL if we canceled the operation, but the service
318        did not get the cancel message yet. */
319     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
320                 "Ignoring result from canceled operation\n");
321     return;
322   }
323   if (GNUNET_SET_STATUS_OK != result_status)
324   {
325     /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
326      * and this is the last result message we get */
327     GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
328     GNUNET_CONTAINER_DLL_remove (set->ops_head,
329                                  set->ops_tail,
330                                  oh);
331     if ( (GNUNET_YES == set->destroy_requested) &&
332          (NULL == set->ops_head) )
333       GNUNET_SET_destroy (set);
334     if (NULL != oh->result_cb)
335       oh->result_cb (oh->result_cls,
336                      NULL,
337                      result_status);
338     switch (result_status)
339     {
340     case GNUNET_SET_STATUS_OK:
341       break;
342     case GNUNET_SET_STATUS_FAILURE:
343       oh->result_cb = NULL;
344       break;
345     case GNUNET_SET_STATUS_HALF_DONE:
346       break;
347     case GNUNET_SET_STATUS_DONE:
348       oh->result_cb = NULL;
349       break;
350     }
351     GNUNET_free (oh);
352     return;
353   }
354   e.data = &msg[1];
355   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
356   e.element_type = msg->element_type;
357   if (NULL != oh->result_cb)
358     oh->result_cb (oh->result_cls,
359                    &e,
360                    result_status);
361 }
362
363
364 /**
365  * Destroy the given set operation.
366  *
367  * @param oh set operation to destroy
368  */
369 static void
370 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
371 {
372   struct GNUNET_SET_Handle *set = oh->set;
373   struct GNUNET_SET_OperationHandle *h_assoc;
374
375   if (NULL != oh->conclude_mqm)
376     GNUNET_MQ_discard (oh->conclude_mqm);
377   /* is the operation already commited? */
378   if (NULL != set)
379   {
380     GNUNET_CONTAINER_DLL_remove (set->ops_head,
381                                  set->ops_tail,
382                                  oh);
383     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
384                                       oh->request_id);
385     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
386   }
387   GNUNET_free (oh);
388 }
389
390
391 /**
392  * Cancel the given set operation.  We need to send an explicit cancel
393  * message, as all operations one one set communicate using one
394  * handle.
395  *
396  * @param oh set operation to cancel
397  */
398 void
399 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
400 {
401   struct GNUNET_SET_Handle *set = oh->set;
402   struct GNUNET_SET_CancelMessage *m;
403   struct GNUNET_MQ_Envelope *mqm;
404
405   if (NULL != set)
406   {
407     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
408     m->request_id = htonl (oh->request_id);
409     GNUNET_MQ_send (set->mq, mqm);
410   }
411   set_operation_destroy (oh);
412   if ( (NULL != set) &&
413        (GNUNET_YES == set->destroy_requested) &&
414        (NULL == set->ops_head) )
415   {
416     LOG (GNUNET_ERROR_TYPE_DEBUG,
417          "Destroying set after operation cancel\n");
418     GNUNET_SET_destroy (set);
419   }
420 }
421
422
423 /**
424  * We encountered an error communicating with the set service while
425  * performing a set operation. Report to the application.
426  *
427  * @param cls the `struct GNUNET_SET_Handle`
428  * @param error error code
429  */
430 static void
431 handle_client_set_error (void *cls,
432                          enum GNUNET_MQ_Error error)
433 {
434   struct GNUNET_SET_Handle *set = cls;
435
436   LOG (GNUNET_ERROR_TYPE_DEBUG,
437        "Handling client set error %d\n",
438        error);
439   while (NULL != set->ops_head)
440   {
441     if (NULL != set->ops_head->result_cb)
442       set->ops_head->result_cb (set->ops_head->result_cls,
443                                 NULL,
444                                 GNUNET_SET_STATUS_FAILURE);
445     set_operation_destroy (set->ops_head);
446   }
447   set->invalid = GNUNET_YES;
448   if (GNUNET_YES == set->destroy_requested)
449   {
450     LOG (GNUNET_ERROR_TYPE_DEBUG,
451          "Destroying set after operation failure\n");
452     GNUNET_SET_destroy (set);
453   }
454 }
455
456
457 /**
458  * Create an empty set, supporting the specified operation.
459  *
460  * @param cfg configuration to use for connecting to the
461  *        set service
462  * @param op operation supported by the set
463  *        Note that the operation has to be specified
464  *        beforehand, as certain set operations need to maintain
465  *        data structures spefific to the operation
466  * @return a handle to the set
467  */
468 struct GNUNET_SET_Handle *
469 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
470                    enum GNUNET_SET_OperationType op)
471 {
472   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
473     { &handle_result,
474       GNUNET_MESSAGE_TYPE_SET_RESULT,
475       0 },
476     { &handle_iter_element,
477       GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
478       0 },
479     { &handle_iter_done,
480       GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
481       sizeof (struct GNUNET_MessageHeader) },
482     GNUNET_MQ_HANDLERS_END
483   };
484   struct GNUNET_SET_Handle *set;
485   struct GNUNET_MQ_Envelope *mqm;
486   struct GNUNET_SET_CreateMessage *msg;
487
488   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489               "Creating new set (operation %u)\n",
490               op);
491   set = GNUNET_new (struct GNUNET_SET_Handle);
492   set->client = GNUNET_CLIENT_connect ("set", cfg);
493   if (NULL == set->client)
494   {
495     GNUNET_free (set);
496     return NULL;
497   }
498   set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
499                                                    mq_handlers,
500                                                    &handle_client_set_error,
501                                                    set);
502   GNUNET_assert (NULL != set->mq);
503   mqm = GNUNET_MQ_msg (msg,
504                        GNUNET_MESSAGE_TYPE_SET_CREATE);
505   msg->operation = htonl (op);
506   GNUNET_MQ_send (set->mq, mqm);
507   return set;
508 }
509
510
511 /**
512  * Add an element to the given set.  After the element has been added
513  * (in the sense of being transmitted to the set service), @a cont
514  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
515  * queued.
516  *
517  * @param set set to add element to
518  * @param element element to add to the set
519  * @param cont continuation called after the element has been added
520  * @param cont_cls closure for @a cont
521  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
522  *         set is invalid (e.g. the set service crashed)
523  */
524 int
525 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
526                         const struct GNUNET_SET_Element *element,
527                         GNUNET_SET_Continuation cont,
528                         void *cont_cls)
529 {
530   struct GNUNET_MQ_Envelope *mqm;
531   struct GNUNET_SET_ElementMessage *msg;
532
533   if (GNUNET_YES == set->invalid)
534   {
535     if (NULL != cont)
536       cont (cont_cls);
537     return GNUNET_SYSERR;
538   }
539   mqm = GNUNET_MQ_msg_extra (msg, element->size,
540                              GNUNET_MESSAGE_TYPE_SET_ADD);
541   msg->element_type = element->element_type;
542   memcpy (&msg[1],
543           element->data,
544           element->size);
545   GNUNET_MQ_notify_sent (mqm,
546                          cont, cont_cls);
547   GNUNET_MQ_send (set->mq, mqm);
548   return GNUNET_OK;
549 }
550
551
552 /**
553  * Remove an element to the given set.  After the element has been
554  * removed (in the sense of the request being transmitted to the set
555  * service), @a cont will be called.  Multiple calls to
556  * GNUNET_SET_remove_element() can be queued
557  *
558  * @param set set to remove element from
559  * @param element element to remove from the set
560  * @param cont continuation called after the element has been removed
561  * @param cont_cls closure for @a cont
562  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
563  *         set is invalid (e.g. the set service crashed)
564  */
565 int
566 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
567                            const struct GNUNET_SET_Element *element,
568                            GNUNET_SET_Continuation cont,
569                            void *cont_cls)
570 {
571   struct GNUNET_MQ_Envelope *mqm;
572   struct GNUNET_SET_ElementMessage *msg;
573
574   if (GNUNET_YES == set->invalid)
575   {
576     if (NULL != cont)
577       cont (cont_cls);
578     return GNUNET_SYSERR;
579   }
580   mqm = GNUNET_MQ_msg_extra (msg,
581                              element->size,
582                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
583   msg->element_type = element->element_type;
584   memcpy (&msg[1],
585           element->data,
586           element->size);
587   GNUNET_MQ_notify_sent (mqm,
588                          cont, cont_cls);
589   GNUNET_MQ_send (set->mq, mqm);
590   return GNUNET_OK;
591 }
592
593
594 /**
595  * Destroy the set handle if no operations are left, mark the set
596  * for destruction otherwise.
597  *
598  * @param set set handle to destroy
599  */
600 void
601 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
602 {
603   /* destroying set while iterator is active is currently
604      not supported; we should expand the API to allow
605      clients to explicitly cancel the iteration! */
606   GNUNET_assert (NULL == set->iterator);
607   if (NULL != set->ops_head)
608   {
609     LOG (GNUNET_ERROR_TYPE_DEBUG,
610          "Set operations are pending, delaying set destruction\n");
611     set->destroy_requested = GNUNET_YES;
612     return;
613   }
614   LOG (GNUNET_ERROR_TYPE_DEBUG,
615        "Really destroying set\n");
616   if (NULL != set->client)
617   {
618     GNUNET_CLIENT_disconnect (set->client);
619     set->client = NULL;
620   }
621   if (NULL != set->mq)
622   {
623     GNUNET_MQ_destroy (set->mq);
624     set->mq = NULL;
625   }
626   GNUNET_free (set);
627 }
628
629
630 /**
631  * Prepare a set operation to be evaluated with another peer.
632  * The evaluation will not start until the client provides
633  * a local set with #GNUNET_SET_commit().
634  *
635  * @param other_peer peer with the other set
636  * @param app_id hash for the application using the set
637  * @param context_msg additional information for the request
638  * @param result_mode specified how results will be returned,
639  *        see `enum GNUNET_SET_ResultMode`.
640  * @param result_cb called on error or success
641  * @param result_cls closure for @e result_cb
642  * @return a handle to cancel the operation
643  */
644 struct GNUNET_SET_OperationHandle *
645 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
646                     const struct GNUNET_HashCode *app_id,
647                     const struct GNUNET_MessageHeader *context_msg,
648                     enum GNUNET_SET_ResultMode result_mode,
649                     GNUNET_SET_ResultIterator result_cb,
650                     void *result_cls)
651 {
652   struct GNUNET_MQ_Envelope *mqm;
653   struct GNUNET_SET_OperationHandle *oh;
654   struct GNUNET_SET_EvaluateMessage *msg;
655
656   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
657   oh->result_cb = result_cb;
658   oh->result_cls = result_cls;
659   mqm = GNUNET_MQ_msg_nested_mh (msg,
660                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
661                                  context_msg);
662   msg->app_id = *app_id;
663   msg->result_mode = htonl (result_mode);
664   msg->target_peer = *other_peer;
665   oh->conclude_mqm = mqm;
666   oh->request_id_addr = &msg->request_id;
667
668   return oh;
669 }
670
671
672 /**
673  * Connect to the set service in order to listen for requests.
674  *
675  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
676  * @param tc task context if invoked as a task, NULL otherwise
677  */
678 static void
679 listen_connect (void *cls,
680                 const struct GNUNET_SCHEDULER_TaskContext *tc);
681
682
683 /**
684  * Handle request message for a listen operation
685  *
686  * @param cls the listen handle
687  * @param mh the message
688  */
689 static void
690 handle_request (void *cls,
691                 const struct GNUNET_MessageHeader *mh)
692 {
693   struct GNUNET_SET_ListenHandle *lh = cls;
694   const struct GNUNET_SET_RequestMessage *msg;
695   struct GNUNET_SET_Request req;
696   const struct GNUNET_MessageHeader *context_msg;
697   uint16_t msize;
698   struct GNUNET_MQ_Envelope *mqm;
699   struct GNUNET_SET_RejectMessage *rmsg;
700
701   LOG (GNUNET_ERROR_TYPE_DEBUG,
702        "Processing incoming operation request\n");
703   msize = ntohs (mh->size);
704   if (msize < sizeof (struct GNUNET_SET_RequestMessage))
705   {
706     GNUNET_break (0);
707     GNUNET_CLIENT_disconnect (lh->client);
708     lh->client = NULL;
709     GNUNET_MQ_destroy (lh->mq);
710     lh->mq = NULL;
711     lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
712                                                        &listen_connect, lh);
713     lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
714     return;
715   }
716   /* we got another valid request => reset the backoff */
717   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
718   msg = (const struct GNUNET_SET_RequestMessage *) mh;
719   req.accept_id = ntohl (msg->accept_id);
720   req.accepted = GNUNET_NO;
721   context_msg = GNUNET_MQ_extract_nested_mh (msg);
722   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
723   lh->listen_cb (lh->listen_cls,
724                  &msg->peer_id,
725                  context_msg,
726                  &req);
727   if (GNUNET_YES == req.accepted)
728     return; /* the accept-case is handled in #GNUNET_SET_accept() */
729   LOG (GNUNET_ERROR_TYPE_DEBUG,
730        "Rejecting request\n");
731   mqm = GNUNET_MQ_msg (rmsg,
732                        GNUNET_MESSAGE_TYPE_SET_REJECT);
733   rmsg->accept_reject_id = msg->accept_id;
734   GNUNET_MQ_send (lh->mq, mqm);
735 }
736
737
738 /**
739  * Our connection with the set service encountered an error,
740  * re-initialize with exponential back-off.
741  *
742  * @param cls the `struct GNUNET_SET_ListenHandle *`
743  * @param error reason for the disconnect
744  */
745 static void
746 handle_client_listener_error (void *cls,
747                               enum GNUNET_MQ_Error error)
748 {
749   struct GNUNET_SET_ListenHandle *lh = cls;
750
751   LOG (GNUNET_ERROR_TYPE_DEBUG,
752        "Listener broke down (%d), re-connecting\n",
753        (int) error);
754   GNUNET_CLIENT_disconnect (lh->client);
755   lh->client = NULL;
756   GNUNET_MQ_destroy (lh->mq);
757   lh->mq = NULL;
758   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
759                                                      &listen_connect, lh);
760   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
761 }
762
763
764 /**
765  * Connect to the set service in order to listen for requests.
766  *
767  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
768  * @param tc task context if invoked as a task, NULL otherwise
769  */
770 static void
771 listen_connect (void *cls,
772                 const struct GNUNET_SCHEDULER_TaskContext *tc)
773 {
774   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
775     { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
776     GNUNET_MQ_HANDLERS_END
777   };
778   struct GNUNET_SET_ListenHandle *lh = cls;
779   struct GNUNET_MQ_Envelope *mqm;
780   struct GNUNET_SET_ListenMessage *msg;
781
782   if ( (NULL != tc) &&
783        (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
784   {
785     LOG (GNUNET_ERROR_TYPE_DEBUG,
786          "Listener not reconnecting due to shutdown\n");
787     return;
788   }
789   lh->reconnect_task = NULL;
790   GNUNET_assert (NULL == lh->client);
791   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
792   if (NULL == lh->client)
793     return;
794   GNUNET_assert (NULL == lh->mq);
795   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
796                                                   mq_handlers,
797                                                   &handle_client_listener_error, lh);
798   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
799   msg->operation = htonl (lh->operation);
800   msg->app_id = lh->app_id;
801   GNUNET_MQ_send (lh->mq, mqm);
802 }
803
804
805 /**
806  * Wait for set operation requests for the given application id
807  *
808  * @param cfg configuration to use for connecting to
809  *            the set service, needs to be valid for the lifetime of the listen handle
810  * @param operation operation we want to listen for
811  * @param app_id id of the application that handles set operation requests
812  * @param listen_cb called for each incoming request matching the operation
813  *                  and application id
814  * @param listen_cls handle for @a listen_cb
815  * @return a handle that can be used to cancel the listen operation
816  */
817 struct GNUNET_SET_ListenHandle *
818 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
819                    enum GNUNET_SET_OperationType operation,
820                    const struct GNUNET_HashCode *app_id,
821                    GNUNET_SET_ListenCallback listen_cb,
822                    void *listen_cls)
823 {
824   struct GNUNET_SET_ListenHandle *lh;
825
826   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
827   lh->listen_cb = listen_cb;
828   lh->listen_cls = listen_cls;
829   lh->cfg = cfg;
830   lh->operation = operation;
831   lh->app_id = *app_id;
832   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
833   listen_connect (lh, NULL);
834   if (NULL == lh->client)
835   {
836     GNUNET_free (lh);
837     return NULL;
838   }
839   return lh;
840 }
841
842
843 /**
844  * Cancel the given listen operation.
845  *
846  * @param lh handle for the listen operation
847  */
848 void
849 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
850 {
851   LOG (GNUNET_ERROR_TYPE_DEBUG,
852        "Canceling listener\n");
853   if (NULL != lh->mq)
854   {
855     GNUNET_MQ_destroy (lh->mq);
856     lh->mq = NULL;
857   }
858   if (NULL != lh->client)
859   {
860     GNUNET_CLIENT_disconnect (lh->client);
861     lh->client = NULL;
862   }
863   if (NULL != lh->reconnect_task)
864   {
865     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
866     lh->reconnect_task = NULL;
867   }
868   GNUNET_free (lh);
869 }
870
871
872 /**
873  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
874  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
875  * afterwards.
876  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
877  * and to begin the exchange with the remote peer.
878  *
879  * @param request request to accept
880  * @param result_mode specified how results will be returned,
881  *        see `enum GNUNET_SET_ResultMode`.
882  * @param result_cb callback for the results
883  * @param result_cls closure for @a result_cb
884  * @return a handle to cancel the operation
885  */
886 struct GNUNET_SET_OperationHandle *
887 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
888                    enum GNUNET_SET_ResultMode result_mode,
889                    GNUNET_SET_ResultIterator result_cb,
890                    void *result_cls)
891 {
892   struct GNUNET_MQ_Envelope *mqm;
893   struct GNUNET_SET_OperationHandle *oh;
894   struct GNUNET_SET_AcceptMessage *msg;
895
896   GNUNET_assert (GNUNET_NO == request->accepted);
897   request->accepted = GNUNET_YES;
898   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
899   msg->accept_reject_id = htonl (request->accept_id);
900   msg->result_mode = htonl (result_mode);
901   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
902   oh->result_cb = result_cb;
903   oh->result_cls = result_cls;
904   oh->conclude_mqm = mqm;
905   oh->request_id_addr = &msg->request_id;
906   return oh;
907 }
908
909
910 /**
911  * Commit a set to be used with a set operation.
912  * This function is called once we have fully constructed
913  * the set that we want to use for the operation.  At this
914  * time, the P2P protocol can then begin to exchange the
915  * set information and call the result callback with the
916  * result information.
917  *
918  * @param oh handle to the set operation
919  * @param set the set to use for the operation
920  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
921  *         set is invalid (e.g. the set service crashed)
922  */
923 int
924 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
925                    struct GNUNET_SET_Handle *set)
926 {
927   GNUNET_assert (NULL == oh->set);
928   if (GNUNET_YES == set->invalid)
929     return GNUNET_SYSERR;
930   GNUNET_assert (NULL != oh->conclude_mqm);
931   oh->set = set;
932   GNUNET_CONTAINER_DLL_insert (set->ops_head,
933                                set->ops_tail,
934                                oh);
935   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
936   *oh->request_id_addr = htonl (oh->request_id);
937   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
938   oh->conclude_mqm = NULL;
939   oh->request_id_addr = NULL;
940   return GNUNET_OK;
941 }
942
943
944 /**
945  * Iterate over all elements in the given set.  Note that this
946  * operation involves transferring every element of the set from the
947  * service to the client, and is thus costly.
948  *
949  * @param set the set to iterate over
950  * @param iter the iterator to call for each element
951  * @param iter_cls closure for @a iter
952  * @return #GNUNET_YES if the iteration started successfuly,
953  *         #GNUNET_NO if another iteration is active
954  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
955  */
956 int
957 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
958                     GNUNET_SET_ElementIterator iter,
959                     void *iter_cls)
960 {
961   struct GNUNET_MQ_Envelope *ev;
962
963   GNUNET_assert (NULL != iter);
964   if (GNUNET_YES == set->invalid)
965     return GNUNET_SYSERR;
966   if (NULL != set->iterator)
967     return GNUNET_NO;
968   LOG (GNUNET_ERROR_TYPE_DEBUG,
969        "Iterating over set\n");
970   set->iterator = iter;
971   set->iterator_cls = iter_cls;
972   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
973   GNUNET_MQ_send (set->mq, ev);
974   return GNUNET_YES;
975 }
976
977
978 /**
979  * Stop iteration over all elements in the given set.  Can only
980  * be called before the iteration has "naturally" completed its
981  * turn.
982  *
983  * @param set the set to stop iterating over
984  */
985 void
986 GNUNET_SET_iterate_cancel (struct GNUNET_SET_Handle *set)
987 {
988   GNUNET_assert (NULL != set->iterator);
989   set->iterator = NULL;
990   set->iteration_id++;
991 }
992
993
994 /* end of set_api.c */