aadc9367866efbc5ebedcea09018cec872c102c7
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_set_service.h"
31 #include "set.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35
36 struct SetCopyRequest
37 {
38   struct SetCopyRequest *next;
39
40   struct SetCopyRequest *prev;
41
42   void *cls;
43
44   GNUNET_SET_CopyReadyCallback cb;
45 };
46
47 /**
48  * Opaque handle to a set.
49  */
50 struct GNUNET_SET_Handle
51 {
52   /**
53    * Client connected to the set service.
54    */
55   struct GNUNET_CLIENT_Connection *client;
56
57   /**
58    * Message queue for @e client.
59    */
60   struct GNUNET_MQ_Handle *mq;
61
62   /**
63    * Linked list of operations on the set.
64    */
65   struct GNUNET_SET_OperationHandle *ops_head;
66
67   /**
68    * Linked list of operations on the set.
69    */
70   struct GNUNET_SET_OperationHandle *ops_tail;
71
72   /**
73    * Callback for the current iteration over the set,
74    * NULL if no iterator is active.
75    */
76   GNUNET_SET_ElementIterator iterator;
77
78   /**
79    * Closure for @e iterator
80    */
81   void *iterator_cls;
82
83   /**
84    * Should the set be destroyed once all operations are gone?
85    */
86   int destroy_requested;
87
88   /**
89    * Has the set become invalid (e.g. service died)?
90    */
91   int invalid;
92
93   /**
94    * Both client and service count the number of iterators
95    * created so far to match replies with iterators.
96    */
97   uint16_t iteration_id;
98
99   /**
100    * Configuration, needed when creating (lazy) copies.
101    */
102   const struct GNUNET_CONFIGURATION_Handle *cfg;
103
104   /**
105    * Doubly linked list of copy requests.
106    */
107   struct SetCopyRequest *copy_req_head;
108
109   /**
110    * Doubly linked list of copy requests.
111    */
112   struct SetCopyRequest *copy_req_tail;
113 };
114
115
116 /**
117  * Handle for a set operation request from another peer.
118  */
119 struct GNUNET_SET_Request
120 {
121   /**
122    * Id of the request, used to identify the request when
123    * accepting/rejecting it.
124    */
125   uint32_t accept_id;
126
127   /**
128    * Has the request been accepted already?
129    * #GNUNET_YES/#GNUNET_NO
130    */
131   int accepted;
132 };
133
134
135 /**
136  * Handle to an operation.  Only known to the service after committing
137  * the handle with a set.
138  */
139 struct GNUNET_SET_OperationHandle
140 {
141   /**
142    * Function to be called when we have a result,
143    * or an error.
144    */
145   GNUNET_SET_ResultIterator result_cb;
146
147   /**
148    * Closure for @e result_cb.
149    */
150   void *result_cls;
151
152   /**
153    * Local set used for the operation,
154    * NULL if no set has been provided by conclude yet.
155    */
156   struct GNUNET_SET_Handle *set;
157
158   /**
159    * Message sent to the server on calling conclude,
160    * NULL if conclude has been called.
161    */
162   struct GNUNET_MQ_Envelope *conclude_mqm;
163
164   /**
165    * Address of the request if in the conclude message,
166    * used to patch the request id into the message when the set is known.
167    */
168   uint32_t *request_id_addr;
169
170   /**
171    * Handles are kept in a linked list.
172    */
173   struct GNUNET_SET_OperationHandle *prev;
174
175   /**
176    * Handles are kept in a linked list.
177    */
178   struct GNUNET_SET_OperationHandle *next;
179
180   /**
181    * Request ID to identify the operation within the set.
182    */
183   uint32_t request_id;
184 };
185
186
187 /**
188  * Opaque handle to a listen operation.
189  */
190 struct GNUNET_SET_ListenHandle
191 {
192   /**
193    * Connection to the service.
194    */
195   struct GNUNET_CLIENT_Connection *client;
196
197   /**
198    * Message queue for the client.
199    */
200   struct GNUNET_MQ_Handle* mq;
201
202   /**
203    * Configuration handle for the listener, stored
204    * here to be able to reconnect transparently on
205    * connection failure.
206    */
207   const struct GNUNET_CONFIGURATION_Handle *cfg;
208
209   /**
210    * Function to call on a new incoming request,
211    * or on error.
212    */
213   GNUNET_SET_ListenCallback listen_cb;
214
215   /**
216    * Closure for @e listen_cb.
217    */
218   void *listen_cls;
219
220   /**
221    * Application ID we listen for.
222    */
223   struct GNUNET_HashCode app_id;
224
225   /**
226    * Time to wait until we try to reconnect on failure.
227    */
228   struct GNUNET_TIME_Relative reconnect_backoff;
229
230   /**
231    * Task for reconnecting when the listener fails.
232    */
233   struct GNUNET_SCHEDULER_Task * reconnect_task;
234
235   /**
236    * Operation we listen for.
237    */
238   enum GNUNET_SET_OperationType operation;
239 };
240
241
242 /* mutual recursion with handle_copy_lazy */
243 static struct GNUNET_SET_Handle *
244 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
245                  enum GNUNET_SET_OperationType op,
246                  uint32_t *cookie);
247
248
249 /**
250  * Handle element for iteration over the set.  Notifies the
251  * iterator and sends an acknowledgement to the service.
252  *
253  * @param cls the `struct GNUNET_SET_Handle *`
254  * @param mh the message
255  */
256 static void
257 handle_copy_lazy (void *cls,
258                   const struct GNUNET_MessageHeader *mh)
259 {
260   struct GNUNET_SET_CopyLazyResponseMessage *msg;
261   struct GNUNET_SET_Handle *set = cls;
262   struct SetCopyRequest *req;
263   struct GNUNET_SET_Handle *new_set;
264
265   msg = (struct GNUNET_SET_CopyLazyResponseMessage *) mh;
266
267   req = set->copy_req_head;
268
269   if (NULL == req)
270   {
271     /* Service sent us unsolicited lazy copy response */
272     GNUNET_break (0);
273     return;
274   }
275   
276   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
277                                set->copy_req_tail,
278                                req);
279
280   
281   // We pass none as operation here, since it doesn't matter when
282   // cloning.
283   new_set = create_internal (set->cfg, GNUNET_SET_OPERATION_NONE, &msg->cookie);
284
285   req->cb (req->cls, new_set);
286
287   GNUNET_free (req);
288 }
289
290
291 /**
292  * Handle element for iteration over the set.  Notifies the
293  * iterator and sends an acknowledgement to the service.
294  *
295  * @param cls the `struct GNUNET_SET_Handle *`
296  * @param mh the message
297  */
298 static void
299 handle_iter_element (void *cls,
300                      const struct GNUNET_MessageHeader *mh)
301 {
302   struct GNUNET_SET_Handle *set = cls;
303   GNUNET_SET_ElementIterator iter = set->iterator;
304   struct GNUNET_SET_Element element;
305   const struct GNUNET_SET_IterResponseMessage *msg;
306   struct GNUNET_SET_IterAckMessage *ack_msg;
307   struct GNUNET_MQ_Envelope *ev;
308   uint16_t msize;
309
310   msize = ntohs (mh->size);
311   if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
312   {
313     /* message malformed */
314     GNUNET_break (0);
315     set->iterator = NULL;
316     set->iteration_id++;
317     iter (set->iterator_cls,
318           NULL);
319     iter = NULL;
320   }
321   msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
322   if (set->iteration_id != ntohs (msg->iteration_id))
323   {
324     /* element from a previous iteration, skip! */
325     iter = NULL;
326   }
327   if (NULL != iter)
328   {
329     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
330     element.element_type = htons (msg->element_type);
331     element.data = &msg[1];
332     iter (set->iterator_cls,
333           &element);
334   }
335   ev = GNUNET_MQ_msg (ack_msg,
336                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
337   ack_msg->send_more = htonl ((NULL != iter));
338   GNUNET_MQ_send (set->mq, ev);
339 }
340
341
342 /**
343  * Handle message signalling conclusion of iteration over the set.
344  * Notifies the iterator that we are done.
345  *
346  * @param cls the set
347  * @param mh the message
348  */
349 static void
350 handle_iter_done (void *cls,
351                   const struct GNUNET_MessageHeader *mh)
352 {
353   struct GNUNET_SET_Handle *set = cls;
354   GNUNET_SET_ElementIterator iter = set->iterator;
355
356   if (NULL == iter)
357     return;
358   set->iterator = NULL;
359   set->iteration_id++;
360   iter (set->iterator_cls,
361         NULL);
362 }
363
364
365 /**
366  * Handle result message for a set operation.
367  *
368  * @param cls the set
369  * @param mh the message
370  */
371 static void
372 handle_result (void *cls,
373                const struct GNUNET_MessageHeader *mh)
374 {
375   struct GNUNET_SET_Handle *set = cls;
376   const struct GNUNET_SET_ResultMessage *msg;
377   struct GNUNET_SET_OperationHandle *oh;
378   struct GNUNET_SET_Element e;
379   enum GNUNET_SET_Status result_status;
380
381   msg = (const struct GNUNET_SET_ResultMessage *) mh;
382   GNUNET_assert (NULL != set->mq);
383   result_status = ntohs (msg->result_status);
384   LOG (GNUNET_ERROR_TYPE_DEBUG,
385        "Got result message with status %d\n",
386        result_status);
387
388   oh = GNUNET_MQ_assoc_get (set->mq,
389                             ntohl (msg->request_id));
390   if (NULL == oh)
391   {
392     /* 'oh' can be NULL if we canceled the operation, but the service
393        did not get the cancel message yet. */
394     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395                 "Ignoring result from canceled operation\n");
396     return;
397   }
398   if (GNUNET_SET_STATUS_OK != result_status)
399   {
400     /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
401      * and this is the last result message we get */
402     GNUNET_MQ_assoc_remove (set->mq,
403                             ntohl (msg->request_id));
404     GNUNET_CONTAINER_DLL_remove (set->ops_head,
405                                  set->ops_tail,
406                                  oh);
407     if ( (GNUNET_YES == set->destroy_requested) &&
408          (NULL == set->ops_head) )
409       GNUNET_SET_destroy (set);
410     if (NULL != oh->result_cb)
411       oh->result_cb (oh->result_cls,
412                      NULL,
413                      result_status);
414     switch (result_status)
415     {
416     case GNUNET_SET_STATUS_OK:
417       break;
418     case GNUNET_SET_STATUS_FAILURE:
419       oh->result_cb = NULL;
420       break;
421     case GNUNET_SET_STATUS_HALF_DONE:
422       break;
423     case GNUNET_SET_STATUS_DONE:
424       oh->result_cb = NULL;
425       break;
426     }
427     GNUNET_free (oh);
428     return;
429   }
430   e.data = &msg[1];
431   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
432   e.element_type = msg->element_type;
433   if (NULL != oh->result_cb)
434     oh->result_cb (oh->result_cls,
435                    &e,
436                    result_status);
437 }
438
439
440 /**
441  * Destroy the given set operation.
442  *
443  * @param oh set operation to destroy
444  */
445 static void
446 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
447 {
448   struct GNUNET_SET_Handle *set = oh->set;
449   struct GNUNET_SET_OperationHandle *h_assoc;
450
451   if (NULL != oh->conclude_mqm)
452     GNUNET_MQ_discard (oh->conclude_mqm);
453   /* is the operation already commited? */
454   if (NULL != set)
455   {
456     GNUNET_CONTAINER_DLL_remove (set->ops_head,
457                                  set->ops_tail,
458                                  oh);
459     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
460                                       oh->request_id);
461     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
462   }
463   GNUNET_free (oh);
464 }
465
466
467 /**
468  * Cancel the given set operation.  We need to send an explicit cancel
469  * message, as all operations one one set communicate using one
470  * handle.
471  *
472  * @param oh set operation to cancel
473  */
474 void
475 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
476 {
477   struct GNUNET_SET_Handle *set = oh->set;
478   struct GNUNET_SET_CancelMessage *m;
479   struct GNUNET_MQ_Envelope *mqm;
480
481   if (NULL != set)
482   {
483     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
484     m->request_id = htonl (oh->request_id);
485     GNUNET_MQ_send (set->mq, mqm);
486   }
487   set_operation_destroy (oh);
488   if ( (NULL != set) &&
489        (GNUNET_YES == set->destroy_requested) &&
490        (NULL == set->ops_head) )
491   {
492     LOG (GNUNET_ERROR_TYPE_DEBUG,
493          "Destroying set after operation cancel\n");
494     GNUNET_SET_destroy (set);
495   }
496 }
497
498
499 /**
500  * We encountered an error communicating with the set service while
501  * performing a set operation. Report to the application.
502  *
503  * @param cls the `struct GNUNET_SET_Handle`
504  * @param error error code
505  */
506 static void
507 handle_client_set_error (void *cls,
508                          enum GNUNET_MQ_Error error)
509 {
510   struct GNUNET_SET_Handle *set = cls;
511
512   LOG (GNUNET_ERROR_TYPE_DEBUG,
513        "Handling client set error %d\n",
514        error);
515   while (NULL != set->ops_head)
516   {
517     if (NULL != set->ops_head->result_cb)
518       set->ops_head->result_cb (set->ops_head->result_cls,
519                                 NULL,
520                                 GNUNET_SET_STATUS_FAILURE);
521     set_operation_destroy (set->ops_head);
522   }
523   set->invalid = GNUNET_YES;
524   if (GNUNET_YES == set->destroy_requested)
525   {
526     LOG (GNUNET_ERROR_TYPE_DEBUG,
527          "Destroying set after operation failure\n");
528     GNUNET_SET_destroy (set);
529   }
530 }
531
532
533 static struct GNUNET_SET_Handle *
534 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
535                  enum GNUNET_SET_OperationType op,
536                  uint32_t *cookie)
537 {
538   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
539     { &handle_result,
540       GNUNET_MESSAGE_TYPE_SET_RESULT,
541       0 },
542     { &handle_iter_element,
543       GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
544       0 },
545     { &handle_iter_done,
546       GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
547       sizeof (struct GNUNET_MessageHeader) },
548     { &handle_copy_lazy,
549       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
550       sizeof (struct GNUNET_SET_CopyLazyResponseMessage) },
551     GNUNET_MQ_HANDLERS_END
552   };
553   struct GNUNET_SET_Handle *set;
554   struct GNUNET_MQ_Envelope *mqm;
555   struct GNUNET_SET_CreateMessage *create_msg;
556   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
557
558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559               "Creating new set (operation %u)\n",
560               op);
561   set = GNUNET_new (struct GNUNET_SET_Handle);
562   set->client = GNUNET_CLIENT_connect ("set", cfg);
563   set->cfg = cfg;
564   if (NULL == set->client)
565   {
566     GNUNET_free (set);
567     return NULL;
568   }
569   set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
570                                                    mq_handlers,
571                                                    &handle_client_set_error,
572                                                    set);
573   GNUNET_assert (NULL != set->mq);
574
575   if (NULL == cookie)
576   {
577     mqm = GNUNET_MQ_msg (create_msg,
578                          GNUNET_MESSAGE_TYPE_SET_CREATE);
579     create_msg->operation = htonl (op);
580   }
581   else
582   {
583     mqm = GNUNET_MQ_msg (copy_msg,
584                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
585     copy_msg->cookie = *cookie;
586   }
587   GNUNET_MQ_send (set->mq, mqm);
588   return set;
589 }
590
591
592 /**
593  * Create an empty set, supporting the specified operation.
594  *
595  * @param cfg configuration to use for connecting to the
596  *        set service
597  * @param op operation supported by the set
598  *        Note that the operation has to be specified
599  *        beforehand, as certain set operations need to maintain
600  *        data structures spefific to the operation
601  * @return a handle to the set
602  */
603 struct GNUNET_SET_Handle *
604 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
605                    enum GNUNET_SET_OperationType op)
606 {
607   return create_internal (cfg, op, NULL);
608 }
609
610
611 /**
612  * Add an element to the given set.  After the element has been added
613  * (in the sense of being transmitted to the set service), @a cont
614  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
615  * queued.
616  *
617  * @param set set to add element to
618  * @param element element to add to the set
619  * @param cont continuation called after the element has been added
620  * @param cont_cls closure for @a cont
621  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
622  *         set is invalid (e.g. the set service crashed)
623  */
624 int
625 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
626                         const struct GNUNET_SET_Element *element,
627                         GNUNET_SET_Continuation cont,
628                         void *cont_cls)
629 {
630   struct GNUNET_MQ_Envelope *mqm;
631   struct GNUNET_SET_ElementMessage *msg;
632
633   if (GNUNET_YES == set->invalid)
634   {
635     if (NULL != cont)
636       cont (cont_cls);
637     return GNUNET_SYSERR;
638   }
639   mqm = GNUNET_MQ_msg_extra (msg, element->size,
640                              GNUNET_MESSAGE_TYPE_SET_ADD);
641   msg->element_type = element->element_type;
642   memcpy (&msg[1],
643           element->data,
644           element->size);
645   GNUNET_MQ_notify_sent (mqm,
646                          cont, cont_cls);
647   GNUNET_MQ_send (set->mq, mqm);
648   return GNUNET_OK;
649 }
650
651
652 /**
653  * Remove an element to the given set.  After the element has been
654  * removed (in the sense of the request being transmitted to the set
655  * service), @a cont will be called.  Multiple calls to
656  * GNUNET_SET_remove_element() can be queued
657  *
658  * @param set set to remove element from
659  * @param element element to remove from the set
660  * @param cont continuation called after the element has been removed
661  * @param cont_cls closure for @a cont
662  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
663  *         set is invalid (e.g. the set service crashed)
664  */
665 int
666 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
667                            const struct GNUNET_SET_Element *element,
668                            GNUNET_SET_Continuation cont,
669                            void *cont_cls)
670 {
671   struct GNUNET_MQ_Envelope *mqm;
672   struct GNUNET_SET_ElementMessage *msg;
673
674   if (GNUNET_YES == set->invalid)
675   {
676     if (NULL != cont)
677       cont (cont_cls);
678     return GNUNET_SYSERR;
679   }
680   mqm = GNUNET_MQ_msg_extra (msg,
681                              element->size,
682                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
683   msg->element_type = element->element_type;
684   memcpy (&msg[1],
685           element->data,
686           element->size);
687   GNUNET_MQ_notify_sent (mqm,
688                          cont, cont_cls);
689   GNUNET_MQ_send (set->mq, mqm);
690   return GNUNET_OK;
691 }
692
693
694 /**
695  * Destroy the set handle if no operations are left, mark the set
696  * for destruction otherwise.
697  *
698  * @param set set handle to destroy
699  */
700 void
701 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
702 {
703   /* destroying set while iterator is active is currently
704      not supported; we should expand the API to allow
705      clients to explicitly cancel the iteration! */
706   GNUNET_assert (NULL == set->iterator);
707   if (NULL != set->ops_head)
708   {
709     LOG (GNUNET_ERROR_TYPE_DEBUG,
710          "Set operations are pending, delaying set destruction\n");
711     set->destroy_requested = GNUNET_YES;
712     return;
713   }
714   LOG (GNUNET_ERROR_TYPE_DEBUG,
715        "Really destroying set\n");
716   if (NULL != set->client)
717   {
718     GNUNET_CLIENT_disconnect (set->client);
719     set->client = NULL;
720   }
721   if (NULL != set->mq)
722   {
723     GNUNET_MQ_destroy (set->mq);
724     set->mq = NULL;
725   }
726   GNUNET_free (set);
727 }
728
729
730 /**
731  * Prepare a set operation to be evaluated with another peer.
732  * The evaluation will not start until the client provides
733  * a local set with #GNUNET_SET_commit().
734  *
735  * @param other_peer peer with the other set
736  * @param app_id hash for the application using the set
737  * @param context_msg additional information for the request
738  * @param result_mode specified how results will be returned,
739  *        see `enum GNUNET_SET_ResultMode`.
740  * @param result_cb called on error or success
741  * @param result_cls closure for @e result_cb
742  * @return a handle to cancel the operation
743  */
744 struct GNUNET_SET_OperationHandle *
745 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
746                     const struct GNUNET_HashCode *app_id,
747                     const struct GNUNET_MessageHeader *context_msg,
748                     enum GNUNET_SET_ResultMode result_mode,
749                     GNUNET_SET_ResultIterator result_cb,
750                     void *result_cls)
751 {
752   struct GNUNET_MQ_Envelope *mqm;
753   struct GNUNET_SET_OperationHandle *oh;
754   struct GNUNET_SET_EvaluateMessage *msg;
755
756   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
757   oh->result_cb = result_cb;
758   oh->result_cls = result_cls;
759   mqm = GNUNET_MQ_msg_nested_mh (msg,
760                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
761                                  context_msg);
762   msg->app_id = *app_id;
763   msg->result_mode = htonl (result_mode);
764   msg->target_peer = *other_peer;
765   oh->conclude_mqm = mqm;
766   oh->request_id_addr = &msg->request_id;
767
768   return oh;
769 }
770
771
772 /**
773  * Connect to the set service in order to listen for requests.
774  *
775  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
776  * @param tc task context if invoked as a task, NULL otherwise
777  */
778 static void
779 listen_connect (void *cls,
780                 const struct GNUNET_SCHEDULER_TaskContext *tc);
781
782
783 /**
784  * Handle request message for a listen operation
785  *
786  * @param cls the listen handle
787  * @param mh the message
788  */
789 static void
790 handle_request (void *cls,
791                 const struct GNUNET_MessageHeader *mh)
792 {
793   struct GNUNET_SET_ListenHandle *lh = cls;
794   const struct GNUNET_SET_RequestMessage *msg;
795   struct GNUNET_SET_Request req;
796   const struct GNUNET_MessageHeader *context_msg;
797   uint16_t msize;
798   struct GNUNET_MQ_Envelope *mqm;
799   struct GNUNET_SET_RejectMessage *rmsg;
800
801   LOG (GNUNET_ERROR_TYPE_DEBUG,
802        "Processing incoming operation request\n");
803   msize = ntohs (mh->size);
804   if (msize < sizeof (struct GNUNET_SET_RequestMessage))
805   {
806     GNUNET_break (0);
807     GNUNET_CLIENT_disconnect (lh->client);
808     lh->client = NULL;
809     GNUNET_MQ_destroy (lh->mq);
810     lh->mq = NULL;
811     lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
812                                                        &listen_connect, lh);
813     lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
814     return;
815   }
816   /* we got another valid request => reset the backoff */
817   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
818   msg = (const struct GNUNET_SET_RequestMessage *) mh;
819   req.accept_id = ntohl (msg->accept_id);
820   req.accepted = GNUNET_NO;
821   context_msg = GNUNET_MQ_extract_nested_mh (msg);
822   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
823   lh->listen_cb (lh->listen_cls,
824                  &msg->peer_id,
825                  context_msg,
826                  &req);
827   if (GNUNET_YES == req.accepted)
828     return; /* the accept-case is handled in #GNUNET_SET_accept() */
829   LOG (GNUNET_ERROR_TYPE_DEBUG,
830        "Rejecting request\n");
831   mqm = GNUNET_MQ_msg (rmsg,
832                        GNUNET_MESSAGE_TYPE_SET_REJECT);
833   rmsg->accept_reject_id = msg->accept_id;
834   GNUNET_MQ_send (lh->mq, mqm);
835 }
836
837
838 /**
839  * Our connection with the set service encountered an error,
840  * re-initialize with exponential back-off.
841  *
842  * @param cls the `struct GNUNET_SET_ListenHandle *`
843  * @param error reason for the disconnect
844  */
845 static void
846 handle_client_listener_error (void *cls,
847                               enum GNUNET_MQ_Error error)
848 {
849   struct GNUNET_SET_ListenHandle *lh = cls;
850
851   LOG (GNUNET_ERROR_TYPE_DEBUG,
852        "Listener broke down (%d), re-connecting\n",
853        (int) error);
854   GNUNET_CLIENT_disconnect (lh->client);
855   lh->client = NULL;
856   GNUNET_MQ_destroy (lh->mq);
857   lh->mq = NULL;
858   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
859                                                      &listen_connect, lh);
860   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
861 }
862
863
864 /**
865  * Connect to the set service in order to listen for requests.
866  *
867  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
868  * @param tc task context if invoked as a task, NULL otherwise
869  */
870 static void
871 listen_connect (void *cls,
872                 const struct GNUNET_SCHEDULER_TaskContext *tc)
873 {
874   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
875     { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
876     GNUNET_MQ_HANDLERS_END
877   };
878   struct GNUNET_SET_ListenHandle *lh = cls;
879   struct GNUNET_MQ_Envelope *mqm;
880   struct GNUNET_SET_ListenMessage *msg;
881
882   if ( (NULL != tc) &&
883        (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
884   {
885     LOG (GNUNET_ERROR_TYPE_DEBUG,
886          "Listener not reconnecting due to shutdown\n");
887     return;
888   }
889   lh->reconnect_task = NULL;
890   GNUNET_assert (NULL == lh->client);
891   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
892   if (NULL == lh->client)
893     return;
894   GNUNET_assert (NULL == lh->mq);
895   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
896                                                   mq_handlers,
897                                                   &handle_client_listener_error, lh);
898   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
899   msg->operation = htonl (lh->operation);
900   msg->app_id = lh->app_id;
901   GNUNET_MQ_send (lh->mq, mqm);
902 }
903
904
905 /**
906  * Wait for set operation requests for the given application id
907  *
908  * @param cfg configuration to use for connecting to
909  *            the set service, needs to be valid for the lifetime of the listen handle
910  * @param operation operation we want to listen for
911  * @param app_id id of the application that handles set operation requests
912  * @param listen_cb called for each incoming request matching the operation
913  *                  and application id
914  * @param listen_cls handle for @a listen_cb
915  * @return a handle that can be used to cancel the listen operation
916  */
917 struct GNUNET_SET_ListenHandle *
918 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
919                    enum GNUNET_SET_OperationType operation,
920                    const struct GNUNET_HashCode *app_id,
921                    GNUNET_SET_ListenCallback listen_cb,
922                    void *listen_cls)
923 {
924   struct GNUNET_SET_ListenHandle *lh;
925
926   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
927   lh->listen_cb = listen_cb;
928   lh->listen_cls = listen_cls;
929   lh->cfg = cfg;
930   lh->operation = operation;
931   lh->app_id = *app_id;
932   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
933   listen_connect (lh, NULL);
934   if (NULL == lh->client)
935   {
936     GNUNET_free (lh);
937     return NULL;
938   }
939   return lh;
940 }
941
942
943 /**
944  * Cancel the given listen operation.
945  *
946  * @param lh handle for the listen operation
947  */
948 void
949 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
950 {
951   LOG (GNUNET_ERROR_TYPE_DEBUG,
952        "Canceling listener\n");
953   if (NULL != lh->mq)
954   {
955     GNUNET_MQ_destroy (lh->mq);
956     lh->mq = NULL;
957   }
958   if (NULL != lh->client)
959   {
960     GNUNET_CLIENT_disconnect (lh->client);
961     lh->client = NULL;
962   }
963   if (NULL != lh->reconnect_task)
964   {
965     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
966     lh->reconnect_task = NULL;
967   }
968   GNUNET_free (lh);
969 }
970
971
972 /**
973  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
974  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
975  * afterwards.
976  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
977  * and to begin the exchange with the remote peer.
978  *
979  * @param request request to accept
980  * @param result_mode specified how results will be returned,
981  *        see `enum GNUNET_SET_ResultMode`.
982  * @param result_cb callback for the results
983  * @param result_cls closure for @a result_cb
984  * @return a handle to cancel the operation
985  */
986 struct GNUNET_SET_OperationHandle *
987 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
988                    enum GNUNET_SET_ResultMode result_mode,
989                    GNUNET_SET_ResultIterator result_cb,
990                    void *result_cls)
991 {
992   struct GNUNET_MQ_Envelope *mqm;
993   struct GNUNET_SET_OperationHandle *oh;
994   struct GNUNET_SET_AcceptMessage *msg;
995
996   GNUNET_assert (GNUNET_NO == request->accepted);
997   request->accepted = GNUNET_YES;
998   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
999   msg->accept_reject_id = htonl (request->accept_id);
1000   msg->result_mode = htonl (result_mode);
1001   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1002   oh->result_cb = result_cb;
1003   oh->result_cls = result_cls;
1004   oh->conclude_mqm = mqm;
1005   oh->request_id_addr = &msg->request_id;
1006   return oh;
1007 }
1008
1009
1010 /**
1011  * Commit a set to be used with a set operation.
1012  * This function is called once we have fully constructed
1013  * the set that we want to use for the operation.  At this
1014  * time, the P2P protocol can then begin to exchange the
1015  * set information and call the result callback with the
1016  * result information.
1017  *
1018  * @param oh handle to the set operation
1019  * @param set the set to use for the operation
1020  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1021  *         set is invalid (e.g. the set service crashed)
1022  */
1023 int
1024 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1025                    struct GNUNET_SET_Handle *set)
1026 {
1027   GNUNET_assert (NULL == oh->set);
1028   if (GNUNET_YES == set->invalid)
1029     return GNUNET_SYSERR;
1030   GNUNET_assert (NULL != oh->conclude_mqm);
1031   oh->set = set;
1032   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1033                                set->ops_tail,
1034                                oh);
1035   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
1036   *oh->request_id_addr = htonl (oh->request_id);
1037   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
1038   oh->conclude_mqm = NULL;
1039   oh->request_id_addr = NULL;
1040   return GNUNET_OK;
1041 }
1042
1043
1044 /**
1045  * Iterate over all elements in the given set.  Note that this
1046  * operation involves transferring every element of the set from the
1047  * service to the client, and is thus costly.
1048  *
1049  * @param set the set to iterate over
1050  * @param iter the iterator to call for each element
1051  * @param iter_cls closure for @a iter
1052  * @return #GNUNET_YES if the iteration started successfuly,
1053  *         #GNUNET_NO if another iteration is active
1054  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1055  */
1056 int
1057 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1058                     GNUNET_SET_ElementIterator iter,
1059                     void *iter_cls)
1060 {
1061   struct GNUNET_MQ_Envelope *ev;
1062
1063   GNUNET_assert (NULL != iter);
1064   if (GNUNET_YES == set->invalid)
1065     return GNUNET_SYSERR;
1066   if (NULL != set->iterator)
1067     return GNUNET_NO;
1068   LOG (GNUNET_ERROR_TYPE_DEBUG,
1069        "Iterating over set\n");
1070   set->iterator = iter;
1071   set->iterator_cls = iter_cls;
1072   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1073   GNUNET_MQ_send (set->mq, ev);
1074   return GNUNET_YES;
1075 }
1076
1077
1078 void
1079 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1080                       GNUNET_SET_CopyReadyCallback cb,
1081                       void *cls)
1082 {
1083   struct GNUNET_MQ_Envelope *ev;
1084   struct SetCopyRequest *req;
1085
1086   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1087   GNUNET_MQ_send (set->mq, ev);
1088
1089   req = GNUNET_new (struct SetCopyRequest);
1090   req->cb = cb;
1091   req->cls = cls;
1092   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1093                                set->copy_req_tail,
1094                                req);
1095 }
1096
1097
1098 /* end of set_api.c */