16aa87cd0434c1c519b6d1c3e2a4dc8c11213ca9
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_set_service.h"
31 #include "set.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35
36 struct SetCopyRequest
37 {
38   struct SetCopyRequest *next;
39
40   struct SetCopyRequest *prev;
41
42   void *cls;
43
44   GNUNET_SET_CopyReadyCallback cb;
45 };
46
47 /**
48  * Opaque handle to a set.
49  */
50 struct GNUNET_SET_Handle
51 {
52   /**
53    * Client connected to the set service.
54    */
55   struct GNUNET_CLIENT_Connection *client;
56
57   /**
58    * Message queue for @e client.
59    */
60   struct GNUNET_MQ_Handle *mq;
61
62   /**
63    * Linked list of operations on the set.
64    */
65   struct GNUNET_SET_OperationHandle *ops_head;
66
67   /**
68    * Linked list of operations on the set.
69    */
70   struct GNUNET_SET_OperationHandle *ops_tail;
71
72   /**
73    * Callback for the current iteration over the set,
74    * NULL if no iterator is active.
75    */
76   GNUNET_SET_ElementIterator iterator;
77
78   /**
79    * Closure for @e iterator
80    */
81   void *iterator_cls;
82
83   /**
84    * Should the set be destroyed once all operations are gone?
85    */
86   int destroy_requested;
87
88   /**
89    * Has the set become invalid (e.g. service died)?
90    */
91   int invalid;
92
93   /**
94    * Both client and service count the number of iterators
95    * created so far to match replies with iterators.
96    */
97   uint16_t iteration_id;
98
99   /**
100    * Configuration, needed when creating (lazy) copies.
101    */
102   const struct GNUNET_CONFIGURATION_Handle *cfg;
103
104   /**
105    * Doubly linked list of copy requests.
106    */
107   struct SetCopyRequest *copy_req_head;
108
109   /**
110    * Doubly linked list of copy requests.
111    */
112   struct SetCopyRequest *copy_req_tail;
113 };
114
115
116 /**
117  * Handle for a set operation request from another peer.
118  */
119 struct GNUNET_SET_Request
120 {
121   /**
122    * Id of the request, used to identify the request when
123    * accepting/rejecting it.
124    */
125   uint32_t accept_id;
126
127   /**
128    * Has the request been accepted already?
129    * #GNUNET_YES/#GNUNET_NO
130    */
131   int accepted;
132 };
133
134
135 /**
136  * Handle to an operation.  Only known to the service after committing
137  * the handle with a set.
138  */
139 struct GNUNET_SET_OperationHandle
140 {
141   /**
142    * Function to be called when we have a result,
143    * or an error.
144    */
145   GNUNET_SET_ResultIterator result_cb;
146
147   /**
148    * Closure for @e result_cb.
149    */
150   void *result_cls;
151
152   /**
153    * Local set used for the operation,
154    * NULL if no set has been provided by conclude yet.
155    */
156   struct GNUNET_SET_Handle *set;
157
158   /**
159    * Message sent to the server on calling conclude,
160    * NULL if conclude has been called.
161    */
162   struct GNUNET_MQ_Envelope *conclude_mqm;
163
164   /**
165    * Address of the request if in the conclude message,
166    * used to patch the request id into the message when the set is known.
167    */
168   uint32_t *request_id_addr;
169
170   /**
171    * Handles are kept in a linked list.
172    */
173   struct GNUNET_SET_OperationHandle *prev;
174
175   /**
176    * Handles are kept in a linked list.
177    */
178   struct GNUNET_SET_OperationHandle *next;
179
180   /**
181    * Request ID to identify the operation within the set.
182    */
183   uint32_t request_id;
184 };
185
186
187 /**
188  * Opaque handle to a listen operation.
189  */
190 struct GNUNET_SET_ListenHandle
191 {
192   /**
193    * Connection to the service.
194    */
195   struct GNUNET_CLIENT_Connection *client;
196
197   /**
198    * Message queue for the client.
199    */
200   struct GNUNET_MQ_Handle* mq;
201
202   /**
203    * Configuration handle for the listener, stored
204    * here to be able to reconnect transparently on
205    * connection failure.
206    */
207   const struct GNUNET_CONFIGURATION_Handle *cfg;
208
209   /**
210    * Function to call on a new incoming request,
211    * or on error.
212    */
213   GNUNET_SET_ListenCallback listen_cb;
214
215   /**
216    * Closure for @e listen_cb.
217    */
218   void *listen_cls;
219
220   /**
221    * Application ID we listen for.
222    */
223   struct GNUNET_HashCode app_id;
224
225   /**
226    * Time to wait until we try to reconnect on failure.
227    */
228   struct GNUNET_TIME_Relative reconnect_backoff;
229
230   /**
231    * Task for reconnecting when the listener fails.
232    */
233   struct GNUNET_SCHEDULER_Task * reconnect_task;
234
235   /**
236    * Operation we listen for.
237    */
238   enum GNUNET_SET_OperationType operation;
239 };
240
241
242 /* mutual recursion with handle_copy_lazy */
243 static struct GNUNET_SET_Handle *
244 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
245                  enum GNUNET_SET_OperationType op,
246                  uint32_t *cookie);
247
248
249 /**
250  * Handle element for iteration over the set.  Notifies the
251  * iterator and sends an acknowledgement to the service.
252  *
253  * @param cls the `struct GNUNET_SET_Handle *`
254  * @param mh the message
255  */
256 static void
257 handle_copy_lazy (void *cls,
258                   const struct GNUNET_MessageHeader *mh)
259 {
260   struct GNUNET_SET_CopyLazyResponseMessage *msg;
261   struct GNUNET_SET_Handle *set = cls;
262   struct SetCopyRequest *req;
263   struct GNUNET_SET_Handle *new_set;
264
265   msg = (struct GNUNET_SET_CopyLazyResponseMessage *) mh;
266
267   req = set->copy_req_head;
268
269   if (NULL == req)
270   {
271     /* Service sent us unsolicited lazy copy response */
272     GNUNET_break (0);
273     return;
274   }
275
276   LOG (GNUNET_ERROR_TYPE_DEBUG,
277        "Handling response to lazy copy\n");
278   
279   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
280                                set->copy_req_tail,
281                                req);
282
283   
284   // We pass none as operation here, since it doesn't matter when
285   // cloning.
286   new_set = create_internal (set->cfg, GNUNET_SET_OPERATION_NONE, &msg->cookie);
287
288   req->cb (req->cls, new_set);
289
290   GNUNET_free (req);
291 }
292
293
294 /**
295  * Handle element for iteration over the set.  Notifies the
296  * iterator and sends an acknowledgement to the service.
297  *
298  * @param cls the `struct GNUNET_SET_Handle *`
299  * @param mh the message
300  */
301 static void
302 handle_iter_element (void *cls,
303                      const struct GNUNET_MessageHeader *mh)
304 {
305   struct GNUNET_SET_Handle *set = cls;
306   GNUNET_SET_ElementIterator iter = set->iterator;
307   struct GNUNET_SET_Element element;
308   const struct GNUNET_SET_IterResponseMessage *msg;
309   struct GNUNET_SET_IterAckMessage *ack_msg;
310   struct GNUNET_MQ_Envelope *ev;
311   uint16_t msize;
312
313   msize = ntohs (mh->size);
314   if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
315   {
316     /* message malformed */
317     GNUNET_break (0);
318     set->iterator = NULL;
319     set->iteration_id++;
320     iter (set->iterator_cls,
321           NULL);
322     iter = NULL;
323   }
324   msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
325   if (set->iteration_id != ntohs (msg->iteration_id))
326   {
327     /* element from a previous iteration, skip! */
328     iter = NULL;
329   }
330   if (NULL != iter)
331   {
332     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
333     element.element_type = htons (msg->element_type);
334     element.data = &msg[1];
335     iter (set->iterator_cls,
336           &element);
337   }
338   ev = GNUNET_MQ_msg (ack_msg,
339                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
340   ack_msg->send_more = htonl ((NULL != iter));
341   GNUNET_MQ_send (set->mq, ev);
342 }
343
344
345 /**
346  * Handle message signalling conclusion of iteration over the set.
347  * Notifies the iterator that we are done.
348  *
349  * @param cls the set
350  * @param mh the message
351  */
352 static void
353 handle_iter_done (void *cls,
354                   const struct GNUNET_MessageHeader *mh)
355 {
356   struct GNUNET_SET_Handle *set = cls;
357   GNUNET_SET_ElementIterator iter = set->iterator;
358
359   if (NULL == iter)
360     return;
361   set->iterator = NULL;
362   set->iteration_id++;
363   iter (set->iterator_cls,
364         NULL);
365 }
366
367
368 /**
369  * Handle result message for a set operation.
370  *
371  * @param cls the set
372  * @param mh the message
373  */
374 static void
375 handle_result (void *cls,
376                const struct GNUNET_MessageHeader *mh)
377 {
378   struct GNUNET_SET_Handle *set = cls;
379   const struct GNUNET_SET_ResultMessage *msg;
380   struct GNUNET_SET_OperationHandle *oh;
381   struct GNUNET_SET_Element e;
382   enum GNUNET_SET_Status result_status;
383
384   msg = (const struct GNUNET_SET_ResultMessage *) mh;
385   GNUNET_assert (NULL != set->mq);
386   result_status = ntohs (msg->result_status);
387   LOG (GNUNET_ERROR_TYPE_DEBUG,
388        "Got result message with status %d\n",
389        result_status);
390
391   oh = GNUNET_MQ_assoc_get (set->mq,
392                             ntohl (msg->request_id));
393   if (NULL == oh)
394   {
395     /* 'oh' can be NULL if we canceled the operation, but the service
396        did not get the cancel message yet. */
397     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
398                 "Ignoring result from canceled operation\n");
399     return;
400   }
401
402   switch (result_status)
403   {
404     case GNUNET_SET_STATUS_OK:
405     case GNUNET_SET_STATUS_ADD_LOCAL:
406     case GNUNET_SET_STATUS_ADD_REMOTE:
407       goto do_element;
408     case GNUNET_SET_STATUS_FAILURE:
409     case GNUNET_SET_STATUS_DONE:
410       goto do_final;
411     case GNUNET_SET_STATUS_HALF_DONE:
412       /* not used anymore */
413       GNUNET_assert (0);
414   }
415
416 do_final:
417   LOG (GNUNET_ERROR_TYPE_DEBUG,
418        "Treating result as final status\n");
419   GNUNET_MQ_assoc_remove (set->mq,
420                           ntohl (msg->request_id));
421   GNUNET_CONTAINER_DLL_remove (set->ops_head,
422                                set->ops_tail,
423                                oh);
424   if (NULL != oh->result_cb)
425   {
426     oh->result_cb (oh->result_cls,
427                    NULL,
428                    result_status);
429   }
430   else
431   {
432     LOG (GNUNET_ERROR_TYPE_DEBUG,
433          "No callback for final status\n");
434   }
435   if ( (GNUNET_YES == set->destroy_requested) &&
436        (NULL == set->ops_head) )
437     GNUNET_SET_destroy (set);
438   GNUNET_free (oh);
439   return;
440
441 do_element:
442   LOG (GNUNET_ERROR_TYPE_DEBUG,
443        "Treating result as element\n");
444   e.data = &msg[1];
445   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
446   e.element_type = msg->element_type;
447   if (NULL != oh->result_cb)
448     oh->result_cb (oh->result_cls,
449                    &e,
450                    result_status);
451 }
452
453
454 /**
455  * Destroy the given set operation.
456  *
457  * @param oh set operation to destroy
458  */
459 static void
460 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
461 {
462   struct GNUNET_SET_Handle *set = oh->set;
463   struct GNUNET_SET_OperationHandle *h_assoc;
464
465   if (NULL != oh->conclude_mqm)
466     GNUNET_MQ_discard (oh->conclude_mqm);
467   /* is the operation already commited? */
468   if (NULL != set)
469   {
470     GNUNET_CONTAINER_DLL_remove (set->ops_head,
471                                  set->ops_tail,
472                                  oh);
473     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
474                                       oh->request_id);
475     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
476   }
477   GNUNET_free (oh);
478 }
479
480
481 /**
482  * Cancel the given set operation.  We need to send an explicit cancel
483  * message, as all operations one one set communicate using one
484  * handle.
485  *
486  * @param oh set operation to cancel
487  */
488 void
489 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
490 {
491   struct GNUNET_SET_Handle *set = oh->set;
492   struct GNUNET_SET_CancelMessage *m;
493   struct GNUNET_MQ_Envelope *mqm;
494
495   if (NULL != set)
496   {
497     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
498     m->request_id = htonl (oh->request_id);
499     GNUNET_MQ_send (set->mq, mqm);
500   }
501   set_operation_destroy (oh);
502   if ( (NULL != set) &&
503        (GNUNET_YES == set->destroy_requested) &&
504        (NULL == set->ops_head) )
505   {
506     LOG (GNUNET_ERROR_TYPE_DEBUG,
507          "Destroying set after operation cancel\n");
508     GNUNET_SET_destroy (set);
509   }
510 }
511
512
513 /**
514  * We encountered an error communicating with the set service while
515  * performing a set operation. Report to the application.
516  *
517  * @param cls the `struct GNUNET_SET_Handle`
518  * @param error error code
519  */
520 static void
521 handle_client_set_error (void *cls,
522                          enum GNUNET_MQ_Error error)
523 {
524   struct GNUNET_SET_Handle *set = cls;
525
526   LOG (GNUNET_ERROR_TYPE_DEBUG,
527        "Handling client set error %d\n",
528        error);
529   while (NULL != set->ops_head)
530   {
531     if (NULL != set->ops_head->result_cb)
532       set->ops_head->result_cb (set->ops_head->result_cls,
533                                 NULL,
534                                 GNUNET_SET_STATUS_FAILURE);
535     set_operation_destroy (set->ops_head);
536   }
537   set->invalid = GNUNET_YES;
538   if (GNUNET_YES == set->destroy_requested)
539   {
540     LOG (GNUNET_ERROR_TYPE_DEBUG,
541          "Destroying set after operation failure\n");
542     GNUNET_SET_destroy (set);
543   }
544 }
545
546
547 static struct GNUNET_SET_Handle *
548 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
549                  enum GNUNET_SET_OperationType op,
550                  uint32_t *cookie)
551 {
552   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
553     { &handle_result,
554       GNUNET_MESSAGE_TYPE_SET_RESULT,
555       0 },
556     { &handle_iter_element,
557       GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
558       0 },
559     { &handle_iter_done,
560       GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
561       sizeof (struct GNUNET_MessageHeader) },
562     { &handle_copy_lazy,
563       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
564       sizeof (struct GNUNET_SET_CopyLazyResponseMessage) },
565     GNUNET_MQ_HANDLERS_END
566   };
567   struct GNUNET_SET_Handle *set;
568   struct GNUNET_MQ_Envelope *mqm;
569   struct GNUNET_SET_CreateMessage *create_msg;
570   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
571
572   set = GNUNET_new (struct GNUNET_SET_Handle);
573   set->client = GNUNET_CLIENT_connect ("set", cfg);
574   set->cfg = cfg;
575   if (NULL == set->client)
576   {
577     GNUNET_free (set);
578     return NULL;
579   }
580   set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
581                                                    mq_handlers,
582                                                    &handle_client_set_error,
583                                                    set);
584   GNUNET_assert (NULL != set->mq);
585
586   if (NULL == cookie)
587   {
588     LOG (GNUNET_ERROR_TYPE_DEBUG,
589          "Creating new set (operation %u)\n",
590          op);
591     mqm = GNUNET_MQ_msg (create_msg,
592                          GNUNET_MESSAGE_TYPE_SET_CREATE);
593     create_msg->operation = htonl (op);
594   }
595   else
596   {
597     LOG (GNUNET_ERROR_TYPE_DEBUG,
598          "Creating new set (lazy copy)\n",
599          op);
600     mqm = GNUNET_MQ_msg (copy_msg,
601                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
602     copy_msg->cookie = *cookie;
603   }
604   GNUNET_MQ_send (set->mq, mqm);
605   return set;
606 }
607
608
609 /**
610  * Create an empty set, supporting the specified operation.
611  *
612  * @param cfg configuration to use for connecting to the
613  *        set service
614  * @param op operation supported by the set
615  *        Note that the operation has to be specified
616  *        beforehand, as certain set operations need to maintain
617  *        data structures spefific to the operation
618  * @return a handle to the set
619  */
620 struct GNUNET_SET_Handle *
621 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
622                    enum GNUNET_SET_OperationType op)
623 {
624   return create_internal (cfg, op, NULL);
625 }
626
627
628 /**
629  * Add an element to the given set.  After the element has been added
630  * (in the sense of being transmitted to the set service), @a cont
631  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
632  * queued.
633  *
634  * @param set set to add element to
635  * @param element element to add to the set
636  * @param cont continuation called after the element has been added
637  * @param cont_cls closure for @a cont
638  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
639  *         set is invalid (e.g. the set service crashed)
640  */
641 int
642 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
643                         const struct GNUNET_SET_Element *element,
644                         GNUNET_SET_Continuation cont,
645                         void *cont_cls)
646 {
647   struct GNUNET_MQ_Envelope *mqm;
648   struct GNUNET_SET_ElementMessage *msg;
649
650   if (GNUNET_YES == set->invalid)
651   {
652     if (NULL != cont)
653       cont (cont_cls);
654     return GNUNET_SYSERR;
655   }
656   mqm = GNUNET_MQ_msg_extra (msg, element->size,
657                              GNUNET_MESSAGE_TYPE_SET_ADD);
658   msg->element_type = element->element_type;
659   memcpy (&msg[1],
660           element->data,
661           element->size);
662   GNUNET_MQ_notify_sent (mqm,
663                          cont, cont_cls);
664   GNUNET_MQ_send (set->mq, mqm);
665   return GNUNET_OK;
666 }
667
668
669 /**
670  * Remove an element to the given set.  After the element has been
671  * removed (in the sense of the request being transmitted to the set
672  * service), @a cont will be called.  Multiple calls to
673  * GNUNET_SET_remove_element() can be queued
674  *
675  * @param set set to remove element from
676  * @param element element to remove from the set
677  * @param cont continuation called after the element has been removed
678  * @param cont_cls closure for @a cont
679  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
680  *         set is invalid (e.g. the set service crashed)
681  */
682 int
683 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
684                            const struct GNUNET_SET_Element *element,
685                            GNUNET_SET_Continuation cont,
686                            void *cont_cls)
687 {
688   struct GNUNET_MQ_Envelope *mqm;
689   struct GNUNET_SET_ElementMessage *msg;
690
691   if (GNUNET_YES == set->invalid)
692   {
693     if (NULL != cont)
694       cont (cont_cls);
695     return GNUNET_SYSERR;
696   }
697   mqm = GNUNET_MQ_msg_extra (msg,
698                              element->size,
699                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
700   msg->element_type = element->element_type;
701   memcpy (&msg[1],
702           element->data,
703           element->size);
704   GNUNET_MQ_notify_sent (mqm,
705                          cont, cont_cls);
706   GNUNET_MQ_send (set->mq, mqm);
707   return GNUNET_OK;
708 }
709
710
711 /**
712  * Destroy the set handle if no operations are left, mark the set
713  * for destruction otherwise.
714  *
715  * @param set set handle to destroy
716  */
717 void
718 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
719 {
720   /* destroying set while iterator is active is currently
721      not supported; we should expand the API to allow
722      clients to explicitly cancel the iteration! */
723   GNUNET_assert (NULL == set->iterator);
724   if (NULL != set->ops_head)
725   {
726     LOG (GNUNET_ERROR_TYPE_DEBUG,
727          "Set operations are pending, delaying set destruction\n");
728     set->destroy_requested = GNUNET_YES;
729     return;
730   }
731   LOG (GNUNET_ERROR_TYPE_DEBUG,
732        "Really destroying set\n");
733   if (NULL != set->client)
734   {
735     GNUNET_CLIENT_disconnect (set->client);
736     set->client = NULL;
737   }
738   if (NULL != set->mq)
739   {
740     GNUNET_MQ_destroy (set->mq);
741     set->mq = NULL;
742   }
743   GNUNET_free (set);
744 }
745
746
747 /**
748  * Prepare a set operation to be evaluated with another peer.
749  * The evaluation will not start until the client provides
750  * a local set with #GNUNET_SET_commit().
751  *
752  * @param other_peer peer with the other set
753  * @param app_id hash for the application using the set
754  * @param context_msg additional information for the request
755  * @param result_mode specified how results will be returned,
756  *        see `enum GNUNET_SET_ResultMode`.
757  * @param result_cb called on error or success
758  * @param result_cls closure for @e result_cb
759  * @return a handle to cancel the operation
760  */
761 struct GNUNET_SET_OperationHandle *
762 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
763                     const struct GNUNET_HashCode *app_id,
764                     const struct GNUNET_MessageHeader *context_msg,
765                     enum GNUNET_SET_ResultMode result_mode,
766                     GNUNET_SET_ResultIterator result_cb,
767                     void *result_cls)
768 {
769   struct GNUNET_MQ_Envelope *mqm;
770   struct GNUNET_SET_OperationHandle *oh;
771   struct GNUNET_SET_EvaluateMessage *msg;
772
773   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
774   oh->result_cb = result_cb;
775   oh->result_cls = result_cls;
776   mqm = GNUNET_MQ_msg_nested_mh (msg,
777                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
778                                  context_msg);
779   msg->app_id = *app_id;
780   msg->result_mode = htonl (result_mode);
781   msg->target_peer = *other_peer;
782   oh->conclude_mqm = mqm;
783   oh->request_id_addr = &msg->request_id;
784
785   return oh;
786 }
787
788
789 /**
790  * Connect to the set service in order to listen for requests.
791  *
792  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
793  * @param tc task context if invoked as a task, NULL otherwise
794  */
795 static void
796 listen_connect (void *cls,
797                 const struct GNUNET_SCHEDULER_TaskContext *tc);
798
799
800 /**
801  * Handle request message for a listen operation
802  *
803  * @param cls the listen handle
804  * @param mh the message
805  */
806 static void
807 handle_request (void *cls,
808                 const struct GNUNET_MessageHeader *mh)
809 {
810   struct GNUNET_SET_ListenHandle *lh = cls;
811   const struct GNUNET_SET_RequestMessage *msg;
812   struct GNUNET_SET_Request req;
813   const struct GNUNET_MessageHeader *context_msg;
814   uint16_t msize;
815   struct GNUNET_MQ_Envelope *mqm;
816   struct GNUNET_SET_RejectMessage *rmsg;
817
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819        "Processing incoming operation request\n");
820   msize = ntohs (mh->size);
821   if (msize < sizeof (struct GNUNET_SET_RequestMessage))
822   {
823     GNUNET_break (0);
824     GNUNET_CLIENT_disconnect (lh->client);
825     lh->client = NULL;
826     GNUNET_MQ_destroy (lh->mq);
827     lh->mq = NULL;
828     lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
829                                                        &listen_connect, lh);
830     lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
831     return;
832   }
833   /* we got another valid request => reset the backoff */
834   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
835   msg = (const struct GNUNET_SET_RequestMessage *) mh;
836   req.accept_id = ntohl (msg->accept_id);
837   req.accepted = GNUNET_NO;
838   context_msg = GNUNET_MQ_extract_nested_mh (msg);
839   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
840   lh->listen_cb (lh->listen_cls,
841                  &msg->peer_id,
842                  context_msg,
843                  &req);
844   if (GNUNET_YES == req.accepted)
845     return; /* the accept-case is handled in #GNUNET_SET_accept() */
846   LOG (GNUNET_ERROR_TYPE_DEBUG,
847        "Rejecting request\n");
848   mqm = GNUNET_MQ_msg (rmsg,
849                        GNUNET_MESSAGE_TYPE_SET_REJECT);
850   rmsg->accept_reject_id = msg->accept_id;
851   GNUNET_MQ_send (lh->mq, mqm);
852 }
853
854
855 /**
856  * Our connection with the set service encountered an error,
857  * re-initialize with exponential back-off.
858  *
859  * @param cls the `struct GNUNET_SET_ListenHandle *`
860  * @param error reason for the disconnect
861  */
862 static void
863 handle_client_listener_error (void *cls,
864                               enum GNUNET_MQ_Error error)
865 {
866   struct GNUNET_SET_ListenHandle *lh = cls;
867
868   LOG (GNUNET_ERROR_TYPE_DEBUG,
869        "Listener broke down (%d), re-connecting\n",
870        (int) error);
871   GNUNET_CLIENT_disconnect (lh->client);
872   lh->client = NULL;
873   GNUNET_MQ_destroy (lh->mq);
874   lh->mq = NULL;
875   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
876                                                      &listen_connect, lh);
877   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
878 }
879
880
881 /**
882  * Connect to the set service in order to listen for requests.
883  *
884  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
885  * @param tc task context if invoked as a task, NULL otherwise
886  */
887 static void
888 listen_connect (void *cls,
889                 const struct GNUNET_SCHEDULER_TaskContext *tc)
890 {
891   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
892     { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
893     GNUNET_MQ_HANDLERS_END
894   };
895   struct GNUNET_SET_ListenHandle *lh = cls;
896   struct GNUNET_MQ_Envelope *mqm;
897   struct GNUNET_SET_ListenMessage *msg;
898
899   if ( (NULL != tc) &&
900        (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
901   {
902     LOG (GNUNET_ERROR_TYPE_DEBUG,
903          "Listener not reconnecting due to shutdown\n");
904     return;
905   }
906   lh->reconnect_task = NULL;
907   GNUNET_assert (NULL == lh->client);
908   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
909   if (NULL == lh->client)
910     return;
911   GNUNET_assert (NULL == lh->mq);
912   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
913                                                   mq_handlers,
914                                                   &handle_client_listener_error, lh);
915   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
916   msg->operation = htonl (lh->operation);
917   msg->app_id = lh->app_id;
918   GNUNET_MQ_send (lh->mq, mqm);
919 }
920
921
922 /**
923  * Wait for set operation requests for the given application id
924  *
925  * @param cfg configuration to use for connecting to
926  *            the set service, needs to be valid for the lifetime of the listen handle
927  * @param operation operation we want to listen for
928  * @param app_id id of the application that handles set operation requests
929  * @param listen_cb called for each incoming request matching the operation
930  *                  and application id
931  * @param listen_cls handle for @a listen_cb
932  * @return a handle that can be used to cancel the listen operation
933  */
934 struct GNUNET_SET_ListenHandle *
935 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
936                    enum GNUNET_SET_OperationType operation,
937                    const struct GNUNET_HashCode *app_id,
938                    GNUNET_SET_ListenCallback listen_cb,
939                    void *listen_cls)
940 {
941   struct GNUNET_SET_ListenHandle *lh;
942
943   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
944   lh->listen_cb = listen_cb;
945   lh->listen_cls = listen_cls;
946   lh->cfg = cfg;
947   lh->operation = operation;
948   lh->app_id = *app_id;
949   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
950   listen_connect (lh, NULL);
951   if (NULL == lh->client)
952   {
953     GNUNET_free (lh);
954     return NULL;
955   }
956   return lh;
957 }
958
959
960 /**
961  * Cancel the given listen operation.
962  *
963  * @param lh handle for the listen operation
964  */
965 void
966 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
967 {
968   LOG (GNUNET_ERROR_TYPE_DEBUG,
969        "Canceling listener\n");
970   if (NULL != lh->mq)
971   {
972     GNUNET_MQ_destroy (lh->mq);
973     lh->mq = NULL;
974   }
975   if (NULL != lh->client)
976   {
977     GNUNET_CLIENT_disconnect (lh->client);
978     lh->client = NULL;
979   }
980   if (NULL != lh->reconnect_task)
981   {
982     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
983     lh->reconnect_task = NULL;
984   }
985   GNUNET_free (lh);
986 }
987
988
989 /**
990  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
991  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
992  * afterwards.
993  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
994  * and to begin the exchange with the remote peer.
995  *
996  * @param request request to accept
997  * @param result_mode specified how results will be returned,
998  *        see `enum GNUNET_SET_ResultMode`.
999  * @param result_cb callback for the results
1000  * @param result_cls closure for @a result_cb
1001  * @return a handle to cancel the operation
1002  */
1003 struct GNUNET_SET_OperationHandle *
1004 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1005                    enum GNUNET_SET_ResultMode result_mode,
1006                    GNUNET_SET_ResultIterator result_cb,
1007                    void *result_cls)
1008 {
1009   struct GNUNET_MQ_Envelope *mqm;
1010   struct GNUNET_SET_OperationHandle *oh;
1011   struct GNUNET_SET_AcceptMessage *msg;
1012
1013   GNUNET_assert (GNUNET_NO == request->accepted);
1014   request->accepted = GNUNET_YES;
1015   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1016   msg->accept_reject_id = htonl (request->accept_id);
1017   msg->result_mode = htonl (result_mode);
1018   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1019   oh->result_cb = result_cb;
1020   oh->result_cls = result_cls;
1021   oh->conclude_mqm = mqm;
1022   oh->request_id_addr = &msg->request_id;
1023   return oh;
1024 }
1025
1026
1027 /**
1028  * Commit a set to be used with a set operation.
1029  * This function is called once we have fully constructed
1030  * the set that we want to use for the operation.  At this
1031  * time, the P2P protocol can then begin to exchange the
1032  * set information and call the result callback with the
1033  * result information.
1034  *
1035  * @param oh handle to the set operation
1036  * @param set the set to use for the operation
1037  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1038  *         set is invalid (e.g. the set service crashed)
1039  */
1040 int
1041 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1042                    struct GNUNET_SET_Handle *set)
1043 {
1044   GNUNET_assert (NULL == oh->set);
1045   if (GNUNET_YES == set->invalid)
1046     return GNUNET_SYSERR;
1047   GNUNET_assert (NULL != oh->conclude_mqm);
1048   oh->set = set;
1049   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1050                                set->ops_tail,
1051                                oh);
1052   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
1053   *oh->request_id_addr = htonl (oh->request_id);
1054   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
1055   oh->conclude_mqm = NULL;
1056   oh->request_id_addr = NULL;
1057   return GNUNET_OK;
1058 }
1059
1060
1061 /**
1062  * Iterate over all elements in the given set.  Note that this
1063  * operation involves transferring every element of the set from the
1064  * service to the client, and is thus costly.
1065  *
1066  * @param set the set to iterate over
1067  * @param iter the iterator to call for each element
1068  * @param iter_cls closure for @a iter
1069  * @return #GNUNET_YES if the iteration started successfuly,
1070  *         #GNUNET_NO if another iteration is active
1071  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1072  */
1073 int
1074 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1075                     GNUNET_SET_ElementIterator iter,
1076                     void *iter_cls)
1077 {
1078   struct GNUNET_MQ_Envelope *ev;
1079
1080   GNUNET_assert (NULL != iter);
1081   if (GNUNET_YES == set->invalid)
1082     return GNUNET_SYSERR;
1083   if (NULL != set->iterator)
1084     return GNUNET_NO;
1085   LOG (GNUNET_ERROR_TYPE_DEBUG,
1086        "Iterating over set\n");
1087   set->iterator = iter;
1088   set->iterator_cls = iter_cls;
1089   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1090   GNUNET_MQ_send (set->mq, ev);
1091   return GNUNET_YES;
1092 }
1093
1094
1095 void
1096 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1097                       GNUNET_SET_CopyReadyCallback cb,
1098                       void *cls)
1099 {
1100   struct GNUNET_MQ_Envelope *ev;
1101   struct SetCopyRequest *req;
1102
1103   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1104   GNUNET_MQ_send (set->mq, ev);
1105
1106   req = GNUNET_new (struct SetCopyRequest);
1107   req->cb = cb;
1108   req->cls = cls;
1109   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1110                                set->copy_req_tail,
1111                                req);
1112 }
1113
1114
1115 /* end of set_api.c */