920571a01877da9ebf2663710cce511cadbd9a1a
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_set_service.h"
31 #include "set.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35
36 struct SetCopyRequest
37 {
38   struct SetCopyRequest *next;
39
40   struct SetCopyRequest *prev;
41
42   void *cls;
43
44   GNUNET_SET_CopyReadyCallback cb;
45 };
46
47 /**
48  * Opaque handle to a set.
49  */
50 struct GNUNET_SET_Handle
51 {
52   /**
53    * Client connected to the set service.
54    */
55   struct GNUNET_CLIENT_Connection *client;
56
57   /**
58    * Message queue for @e client.
59    */
60   struct GNUNET_MQ_Handle *mq;
61
62   /**
63    * Linked list of operations on the set.
64    */
65   struct GNUNET_SET_OperationHandle *ops_head;
66
67   /**
68    * Linked list of operations on the set.
69    */
70   struct GNUNET_SET_OperationHandle *ops_tail;
71
72   /**
73    * Callback for the current iteration over the set,
74    * NULL if no iterator is active.
75    */
76   GNUNET_SET_ElementIterator iterator;
77
78   /**
79    * Closure for @e iterator
80    */
81   void *iterator_cls;
82
83   /**
84    * Should the set be destroyed once all operations are gone?
85    */
86   int destroy_requested;
87
88   /**
89    * Has the set become invalid (e.g. service died)?
90    */
91   int invalid;
92
93   /**
94    * Both client and service count the number of iterators
95    * created so far to match replies with iterators.
96    */
97   uint16_t iteration_id;
98
99   /**
100    * Configuration, needed when creating (lazy) copies.
101    */
102   const struct GNUNET_CONFIGURATION_Handle *cfg;
103
104   /**
105    * Doubly linked list of copy requests.
106    */
107   struct SetCopyRequest *copy_req_head;
108
109   /**
110    * Doubly linked list of copy requests.
111    */
112   struct SetCopyRequest *copy_req_tail;
113 };
114
115
116 /**
117  * Handle for a set operation request from another peer.
118  */
119 struct GNUNET_SET_Request
120 {
121   /**
122    * Id of the request, used to identify the request when
123    * accepting/rejecting it.
124    */
125   uint32_t accept_id;
126
127   /**
128    * Has the request been accepted already?
129    * #GNUNET_YES/#GNUNET_NO
130    */
131   int accepted;
132 };
133
134
135 /**
136  * Handle to an operation.  Only known to the service after committing
137  * the handle with a set.
138  */
139 struct GNUNET_SET_OperationHandle
140 {
141   /**
142    * Function to be called when we have a result,
143    * or an error.
144    */
145   GNUNET_SET_ResultIterator result_cb;
146
147   /**
148    * Closure for @e result_cb.
149    */
150   void *result_cls;
151
152   /**
153    * Local set used for the operation,
154    * NULL if no set has been provided by conclude yet.
155    */
156   struct GNUNET_SET_Handle *set;
157
158   /**
159    * Message sent to the server on calling conclude,
160    * NULL if conclude has been called.
161    */
162   struct GNUNET_MQ_Envelope *conclude_mqm;
163
164   /**
165    * Address of the request if in the conclude message,
166    * used to patch the request id into the message when the set is known.
167    */
168   uint32_t *request_id_addr;
169
170   /**
171    * Handles are kept in a linked list.
172    */
173   struct GNUNET_SET_OperationHandle *prev;
174
175   /**
176    * Handles are kept in a linked list.
177    */
178   struct GNUNET_SET_OperationHandle *next;
179
180   /**
181    * Request ID to identify the operation within the set.
182    */
183   uint32_t request_id;
184 };
185
186
187 /**
188  * Opaque handle to a listen operation.
189  */
190 struct GNUNET_SET_ListenHandle
191 {
192   /**
193    * Connection to the service.
194    */
195   struct GNUNET_CLIENT_Connection *client;
196
197   /**
198    * Message queue for the client.
199    */
200   struct GNUNET_MQ_Handle* mq;
201
202   /**
203    * Configuration handle for the listener, stored
204    * here to be able to reconnect transparently on
205    * connection failure.
206    */
207   const struct GNUNET_CONFIGURATION_Handle *cfg;
208
209   /**
210    * Function to call on a new incoming request,
211    * or on error.
212    */
213   GNUNET_SET_ListenCallback listen_cb;
214
215   /**
216    * Closure for @e listen_cb.
217    */
218   void *listen_cls;
219
220   /**
221    * Application ID we listen for.
222    */
223   struct GNUNET_HashCode app_id;
224
225   /**
226    * Time to wait until we try to reconnect on failure.
227    */
228   struct GNUNET_TIME_Relative reconnect_backoff;
229
230   /**
231    * Task for reconnecting when the listener fails.
232    */
233   struct GNUNET_SCHEDULER_Task * reconnect_task;
234
235   /**
236    * Operation we listen for.
237    */
238   enum GNUNET_SET_OperationType operation;
239 };
240
241
242 /* mutual recursion with handle_copy_lazy */
243 static struct GNUNET_SET_Handle *
244 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
245                  enum GNUNET_SET_OperationType op,
246                  uint32_t *cookie);
247
248
249 /**
250  * Handle element for iteration over the set.  Notifies the
251  * iterator and sends an acknowledgement to the service.
252  *
253  * @param cls the `struct GNUNET_SET_Handle *`
254  * @param mh the message
255  */
256 static void
257 handle_copy_lazy (void *cls,
258                   const struct GNUNET_MessageHeader *mh)
259 {
260   struct GNUNET_SET_CopyLazyResponseMessage *msg;
261   struct GNUNET_SET_Handle *set = cls;
262   struct SetCopyRequest *req;
263   struct GNUNET_SET_Handle *new_set;
264
265   msg = (struct GNUNET_SET_CopyLazyResponseMessage *) mh;
266
267   req = set->copy_req_head;
268
269   if (NULL == req)
270   {
271     /* Service sent us unsolicited lazy copy response */
272     GNUNET_break (0);
273     return;
274   }
275
276   LOG (GNUNET_ERROR_TYPE_DEBUG,
277        "Handling response to lazy copy\n");
278   
279   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
280                                set->copy_req_tail,
281                                req);
282
283   
284   // We pass none as operation here, since it doesn't matter when
285   // cloning.
286   new_set = create_internal (set->cfg, GNUNET_SET_OPERATION_NONE, &msg->cookie);
287
288   req->cb (req->cls, new_set);
289
290   GNUNET_free (req);
291 }
292
293
294 /**
295  * Handle element for iteration over the set.  Notifies the
296  * iterator and sends an acknowledgement to the service.
297  *
298  * @param cls the `struct GNUNET_SET_Handle *`
299  * @param mh the message
300  */
301 static void
302 handle_iter_element (void *cls,
303                      const struct GNUNET_MessageHeader *mh)
304 {
305   struct GNUNET_SET_Handle *set = cls;
306   GNUNET_SET_ElementIterator iter = set->iterator;
307   struct GNUNET_SET_Element element;
308   const struct GNUNET_SET_IterResponseMessage *msg;
309   struct GNUNET_SET_IterAckMessage *ack_msg;
310   struct GNUNET_MQ_Envelope *ev;
311   uint16_t msize;
312
313   msize = ntohs (mh->size);
314   if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
315   {
316     /* message malformed */
317     GNUNET_break (0);
318     set->iterator = NULL;
319     set->iteration_id++;
320     iter (set->iterator_cls,
321           NULL);
322     iter = NULL;
323   }
324   msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
325   if (set->iteration_id != ntohs (msg->iteration_id))
326   {
327     /* element from a previous iteration, skip! */
328     iter = NULL;
329   }
330   if (NULL != iter)
331   {
332     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
333     element.element_type = htons (msg->element_type);
334     element.data = &msg[1];
335     iter (set->iterator_cls,
336           &element);
337   }
338   ev = GNUNET_MQ_msg (ack_msg,
339                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
340   ack_msg->send_more = htonl ((NULL != iter));
341   GNUNET_MQ_send (set->mq, ev);
342 }
343
344
345 /**
346  * Handle message signalling conclusion of iteration over the set.
347  * Notifies the iterator that we are done.
348  *
349  * @param cls the set
350  * @param mh the message
351  */
352 static void
353 handle_iter_done (void *cls,
354                   const struct GNUNET_MessageHeader *mh)
355 {
356   struct GNUNET_SET_Handle *set = cls;
357   GNUNET_SET_ElementIterator iter = set->iterator;
358
359   if (NULL == iter)
360     return;
361   set->iterator = NULL;
362   set->iteration_id++;
363   iter (set->iterator_cls,
364         NULL);
365 }
366
367
368 /**
369  * Handle result message for a set operation.
370  *
371  * @param cls the set
372  * @param mh the message
373  */
374 static void
375 handle_result (void *cls,
376                const struct GNUNET_MessageHeader *mh)
377 {
378   struct GNUNET_SET_Handle *set = cls;
379   const struct GNUNET_SET_ResultMessage *msg;
380   struct GNUNET_SET_OperationHandle *oh;
381   struct GNUNET_SET_Element e;
382   enum GNUNET_SET_Status result_status;
383
384   msg = (const struct GNUNET_SET_ResultMessage *) mh;
385   GNUNET_assert (NULL != set->mq);
386   result_status = ntohs (msg->result_status);
387   LOG (GNUNET_ERROR_TYPE_DEBUG,
388        "Got result message with status %d\n",
389        result_status);
390
391   oh = GNUNET_MQ_assoc_get (set->mq,
392                             ntohl (msg->request_id));
393   if (NULL == oh)
394   {
395     /* 'oh' can be NULL if we canceled the operation, but the service
396        did not get the cancel message yet. */
397     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
398                 "Ignoring result from canceled operation\n");
399     return;
400   }
401   if (GNUNET_SET_STATUS_OK != result_status)
402   {
403     /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
404      * and this is the last result message we get */
405     GNUNET_MQ_assoc_remove (set->mq,
406                             ntohl (msg->request_id));
407     GNUNET_CONTAINER_DLL_remove (set->ops_head,
408                                  set->ops_tail,
409                                  oh);
410     if ( (GNUNET_YES == set->destroy_requested) &&
411          (NULL == set->ops_head) )
412       GNUNET_SET_destroy (set);
413     if (NULL != oh->result_cb)
414       oh->result_cb (oh->result_cls,
415                      NULL,
416                      result_status);
417     switch (result_status)
418     {
419     case GNUNET_SET_STATUS_OK:
420       break;
421     case GNUNET_SET_STATUS_FAILURE:
422       oh->result_cb = NULL;
423       break;
424     case GNUNET_SET_STATUS_HALF_DONE:
425       break;
426     case GNUNET_SET_STATUS_DONE:
427       oh->result_cb = NULL;
428       break;
429     }
430     GNUNET_free (oh);
431     return;
432   }
433   e.data = &msg[1];
434   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
435   e.element_type = msg->element_type;
436   if (NULL != oh->result_cb)
437     oh->result_cb (oh->result_cls,
438                    &e,
439                    result_status);
440 }
441
442
443 /**
444  * Destroy the given set operation.
445  *
446  * @param oh set operation to destroy
447  */
448 static void
449 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
450 {
451   struct GNUNET_SET_Handle *set = oh->set;
452   struct GNUNET_SET_OperationHandle *h_assoc;
453
454   if (NULL != oh->conclude_mqm)
455     GNUNET_MQ_discard (oh->conclude_mqm);
456   /* is the operation already commited? */
457   if (NULL != set)
458   {
459     GNUNET_CONTAINER_DLL_remove (set->ops_head,
460                                  set->ops_tail,
461                                  oh);
462     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
463                                       oh->request_id);
464     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
465   }
466   GNUNET_free (oh);
467 }
468
469
470 /**
471  * Cancel the given set operation.  We need to send an explicit cancel
472  * message, as all operations one one set communicate using one
473  * handle.
474  *
475  * @param oh set operation to cancel
476  */
477 void
478 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
479 {
480   struct GNUNET_SET_Handle *set = oh->set;
481   struct GNUNET_SET_CancelMessage *m;
482   struct GNUNET_MQ_Envelope *mqm;
483
484   if (NULL != set)
485   {
486     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
487     m->request_id = htonl (oh->request_id);
488     GNUNET_MQ_send (set->mq, mqm);
489   }
490   set_operation_destroy (oh);
491   if ( (NULL != set) &&
492        (GNUNET_YES == set->destroy_requested) &&
493        (NULL == set->ops_head) )
494   {
495     LOG (GNUNET_ERROR_TYPE_DEBUG,
496          "Destroying set after operation cancel\n");
497     GNUNET_SET_destroy (set);
498   }
499 }
500
501
502 /**
503  * We encountered an error communicating with the set service while
504  * performing a set operation. Report to the application.
505  *
506  * @param cls the `struct GNUNET_SET_Handle`
507  * @param error error code
508  */
509 static void
510 handle_client_set_error (void *cls,
511                          enum GNUNET_MQ_Error error)
512 {
513   struct GNUNET_SET_Handle *set = cls;
514
515   LOG (GNUNET_ERROR_TYPE_DEBUG,
516        "Handling client set error %d\n",
517        error);
518   while (NULL != set->ops_head)
519   {
520     if (NULL != set->ops_head->result_cb)
521       set->ops_head->result_cb (set->ops_head->result_cls,
522                                 NULL,
523                                 GNUNET_SET_STATUS_FAILURE);
524     set_operation_destroy (set->ops_head);
525   }
526   set->invalid = GNUNET_YES;
527   if (GNUNET_YES == set->destroy_requested)
528   {
529     LOG (GNUNET_ERROR_TYPE_DEBUG,
530          "Destroying set after operation failure\n");
531     GNUNET_SET_destroy (set);
532   }
533 }
534
535
536 static struct GNUNET_SET_Handle *
537 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
538                  enum GNUNET_SET_OperationType op,
539                  uint32_t *cookie)
540 {
541   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
542     { &handle_result,
543       GNUNET_MESSAGE_TYPE_SET_RESULT,
544       0 },
545     { &handle_iter_element,
546       GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
547       0 },
548     { &handle_iter_done,
549       GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
550       sizeof (struct GNUNET_MessageHeader) },
551     { &handle_copy_lazy,
552       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
553       sizeof (struct GNUNET_SET_CopyLazyResponseMessage) },
554     GNUNET_MQ_HANDLERS_END
555   };
556   struct GNUNET_SET_Handle *set;
557   struct GNUNET_MQ_Envelope *mqm;
558   struct GNUNET_SET_CreateMessage *create_msg;
559   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
560
561   set = GNUNET_new (struct GNUNET_SET_Handle);
562   set->client = GNUNET_CLIENT_connect ("set", cfg);
563   set->cfg = cfg;
564   if (NULL == set->client)
565   {
566     GNUNET_free (set);
567     return NULL;
568   }
569   set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
570                                                    mq_handlers,
571                                                    &handle_client_set_error,
572                                                    set);
573   GNUNET_assert (NULL != set->mq);
574
575   if (NULL == cookie)
576   {
577     LOG (GNUNET_ERROR_TYPE_DEBUG,
578          "Creating new set (operation %u)\n",
579          op);
580     mqm = GNUNET_MQ_msg (create_msg,
581                          GNUNET_MESSAGE_TYPE_SET_CREATE);
582     create_msg->operation = htonl (op);
583   }
584   else
585   {
586     LOG (GNUNET_ERROR_TYPE_DEBUG,
587          "Creating new set (lazy copy)\n",
588          op);
589     mqm = GNUNET_MQ_msg (copy_msg,
590                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
591     copy_msg->cookie = *cookie;
592   }
593   GNUNET_MQ_send (set->mq, mqm);
594   return set;
595 }
596
597
598 /**
599  * Create an empty set, supporting the specified operation.
600  *
601  * @param cfg configuration to use for connecting to the
602  *        set service
603  * @param op operation supported by the set
604  *        Note that the operation has to be specified
605  *        beforehand, as certain set operations need to maintain
606  *        data structures spefific to the operation
607  * @return a handle to the set
608  */
609 struct GNUNET_SET_Handle *
610 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
611                    enum GNUNET_SET_OperationType op)
612 {
613   return create_internal (cfg, op, NULL);
614 }
615
616
617 /**
618  * Add an element to the given set.  After the element has been added
619  * (in the sense of being transmitted to the set service), @a cont
620  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
621  * queued.
622  *
623  * @param set set to add element to
624  * @param element element to add to the set
625  * @param cont continuation called after the element has been added
626  * @param cont_cls closure for @a cont
627  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
628  *         set is invalid (e.g. the set service crashed)
629  */
630 int
631 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
632                         const struct GNUNET_SET_Element *element,
633                         GNUNET_SET_Continuation cont,
634                         void *cont_cls)
635 {
636   struct GNUNET_MQ_Envelope *mqm;
637   struct GNUNET_SET_ElementMessage *msg;
638
639   if (GNUNET_YES == set->invalid)
640   {
641     if (NULL != cont)
642       cont (cont_cls);
643     return GNUNET_SYSERR;
644   }
645   mqm = GNUNET_MQ_msg_extra (msg, element->size,
646                              GNUNET_MESSAGE_TYPE_SET_ADD);
647   msg->element_type = element->element_type;
648   memcpy (&msg[1],
649           element->data,
650           element->size);
651   GNUNET_MQ_notify_sent (mqm,
652                          cont, cont_cls);
653   GNUNET_MQ_send (set->mq, mqm);
654   return GNUNET_OK;
655 }
656
657
658 /**
659  * Remove an element to the given set.  After the element has been
660  * removed (in the sense of the request being transmitted to the set
661  * service), @a cont will be called.  Multiple calls to
662  * GNUNET_SET_remove_element() can be queued
663  *
664  * @param set set to remove element from
665  * @param element element to remove from the set
666  * @param cont continuation called after the element has been removed
667  * @param cont_cls closure for @a cont
668  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
669  *         set is invalid (e.g. the set service crashed)
670  */
671 int
672 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
673                            const struct GNUNET_SET_Element *element,
674                            GNUNET_SET_Continuation cont,
675                            void *cont_cls)
676 {
677   struct GNUNET_MQ_Envelope *mqm;
678   struct GNUNET_SET_ElementMessage *msg;
679
680   if (GNUNET_YES == set->invalid)
681   {
682     if (NULL != cont)
683       cont (cont_cls);
684     return GNUNET_SYSERR;
685   }
686   mqm = GNUNET_MQ_msg_extra (msg,
687                              element->size,
688                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
689   msg->element_type = element->element_type;
690   memcpy (&msg[1],
691           element->data,
692           element->size);
693   GNUNET_MQ_notify_sent (mqm,
694                          cont, cont_cls);
695   GNUNET_MQ_send (set->mq, mqm);
696   return GNUNET_OK;
697 }
698
699
700 /**
701  * Destroy the set handle if no operations are left, mark the set
702  * for destruction otherwise.
703  *
704  * @param set set handle to destroy
705  */
706 void
707 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
708 {
709   /* destroying set while iterator is active is currently
710      not supported; we should expand the API to allow
711      clients to explicitly cancel the iteration! */
712   GNUNET_assert (NULL == set->iterator);
713   if (NULL != set->ops_head)
714   {
715     LOG (GNUNET_ERROR_TYPE_DEBUG,
716          "Set operations are pending, delaying set destruction\n");
717     set->destroy_requested = GNUNET_YES;
718     return;
719   }
720   LOG (GNUNET_ERROR_TYPE_DEBUG,
721        "Really destroying set\n");
722   if (NULL != set->client)
723   {
724     GNUNET_CLIENT_disconnect (set->client);
725     set->client = NULL;
726   }
727   if (NULL != set->mq)
728   {
729     GNUNET_MQ_destroy (set->mq);
730     set->mq = NULL;
731   }
732   GNUNET_free (set);
733 }
734
735
736 /**
737  * Prepare a set operation to be evaluated with another peer.
738  * The evaluation will not start until the client provides
739  * a local set with #GNUNET_SET_commit().
740  *
741  * @param other_peer peer with the other set
742  * @param app_id hash for the application using the set
743  * @param context_msg additional information for the request
744  * @param result_mode specified how results will be returned,
745  *        see `enum GNUNET_SET_ResultMode`.
746  * @param result_cb called on error or success
747  * @param result_cls closure for @e result_cb
748  * @return a handle to cancel the operation
749  */
750 struct GNUNET_SET_OperationHandle *
751 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
752                     const struct GNUNET_HashCode *app_id,
753                     const struct GNUNET_MessageHeader *context_msg,
754                     enum GNUNET_SET_ResultMode result_mode,
755                     GNUNET_SET_ResultIterator result_cb,
756                     void *result_cls)
757 {
758   struct GNUNET_MQ_Envelope *mqm;
759   struct GNUNET_SET_OperationHandle *oh;
760   struct GNUNET_SET_EvaluateMessage *msg;
761
762   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
763   oh->result_cb = result_cb;
764   oh->result_cls = result_cls;
765   mqm = GNUNET_MQ_msg_nested_mh (msg,
766                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
767                                  context_msg);
768   msg->app_id = *app_id;
769   msg->result_mode = htonl (result_mode);
770   msg->target_peer = *other_peer;
771   oh->conclude_mqm = mqm;
772   oh->request_id_addr = &msg->request_id;
773
774   return oh;
775 }
776
777
778 /**
779  * Connect to the set service in order to listen for requests.
780  *
781  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
782  * @param tc task context if invoked as a task, NULL otherwise
783  */
784 static void
785 listen_connect (void *cls,
786                 const struct GNUNET_SCHEDULER_TaskContext *tc);
787
788
789 /**
790  * Handle request message for a listen operation
791  *
792  * @param cls the listen handle
793  * @param mh the message
794  */
795 static void
796 handle_request (void *cls,
797                 const struct GNUNET_MessageHeader *mh)
798 {
799   struct GNUNET_SET_ListenHandle *lh = cls;
800   const struct GNUNET_SET_RequestMessage *msg;
801   struct GNUNET_SET_Request req;
802   const struct GNUNET_MessageHeader *context_msg;
803   uint16_t msize;
804   struct GNUNET_MQ_Envelope *mqm;
805   struct GNUNET_SET_RejectMessage *rmsg;
806
807   LOG (GNUNET_ERROR_TYPE_DEBUG,
808        "Processing incoming operation request\n");
809   msize = ntohs (mh->size);
810   if (msize < sizeof (struct GNUNET_SET_RequestMessage))
811   {
812     GNUNET_break (0);
813     GNUNET_CLIENT_disconnect (lh->client);
814     lh->client = NULL;
815     GNUNET_MQ_destroy (lh->mq);
816     lh->mq = NULL;
817     lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
818                                                        &listen_connect, lh);
819     lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
820     return;
821   }
822   /* we got another valid request => reset the backoff */
823   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
824   msg = (const struct GNUNET_SET_RequestMessage *) mh;
825   req.accept_id = ntohl (msg->accept_id);
826   req.accepted = GNUNET_NO;
827   context_msg = GNUNET_MQ_extract_nested_mh (msg);
828   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
829   lh->listen_cb (lh->listen_cls,
830                  &msg->peer_id,
831                  context_msg,
832                  &req);
833   if (GNUNET_YES == req.accepted)
834     return; /* the accept-case is handled in #GNUNET_SET_accept() */
835   LOG (GNUNET_ERROR_TYPE_DEBUG,
836        "Rejecting request\n");
837   mqm = GNUNET_MQ_msg (rmsg,
838                        GNUNET_MESSAGE_TYPE_SET_REJECT);
839   rmsg->accept_reject_id = msg->accept_id;
840   GNUNET_MQ_send (lh->mq, mqm);
841 }
842
843
844 /**
845  * Our connection with the set service encountered an error,
846  * re-initialize with exponential back-off.
847  *
848  * @param cls the `struct GNUNET_SET_ListenHandle *`
849  * @param error reason for the disconnect
850  */
851 static void
852 handle_client_listener_error (void *cls,
853                               enum GNUNET_MQ_Error error)
854 {
855   struct GNUNET_SET_ListenHandle *lh = cls;
856
857   LOG (GNUNET_ERROR_TYPE_DEBUG,
858        "Listener broke down (%d), re-connecting\n",
859        (int) error);
860   GNUNET_CLIENT_disconnect (lh->client);
861   lh->client = NULL;
862   GNUNET_MQ_destroy (lh->mq);
863   lh->mq = NULL;
864   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
865                                                      &listen_connect, lh);
866   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
867 }
868
869
870 /**
871  * Connect to the set service in order to listen for requests.
872  *
873  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
874  * @param tc task context if invoked as a task, NULL otherwise
875  */
876 static void
877 listen_connect (void *cls,
878                 const struct GNUNET_SCHEDULER_TaskContext *tc)
879 {
880   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
881     { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
882     GNUNET_MQ_HANDLERS_END
883   };
884   struct GNUNET_SET_ListenHandle *lh = cls;
885   struct GNUNET_MQ_Envelope *mqm;
886   struct GNUNET_SET_ListenMessage *msg;
887
888   if ( (NULL != tc) &&
889        (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
890   {
891     LOG (GNUNET_ERROR_TYPE_DEBUG,
892          "Listener not reconnecting due to shutdown\n");
893     return;
894   }
895   lh->reconnect_task = NULL;
896   GNUNET_assert (NULL == lh->client);
897   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
898   if (NULL == lh->client)
899     return;
900   GNUNET_assert (NULL == lh->mq);
901   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
902                                                   mq_handlers,
903                                                   &handle_client_listener_error, lh);
904   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
905   msg->operation = htonl (lh->operation);
906   msg->app_id = lh->app_id;
907   GNUNET_MQ_send (lh->mq, mqm);
908 }
909
910
911 /**
912  * Wait for set operation requests for the given application id
913  *
914  * @param cfg configuration to use for connecting to
915  *            the set service, needs to be valid for the lifetime of the listen handle
916  * @param operation operation we want to listen for
917  * @param app_id id of the application that handles set operation requests
918  * @param listen_cb called for each incoming request matching the operation
919  *                  and application id
920  * @param listen_cls handle for @a listen_cb
921  * @return a handle that can be used to cancel the listen operation
922  */
923 struct GNUNET_SET_ListenHandle *
924 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
925                    enum GNUNET_SET_OperationType operation,
926                    const struct GNUNET_HashCode *app_id,
927                    GNUNET_SET_ListenCallback listen_cb,
928                    void *listen_cls)
929 {
930   struct GNUNET_SET_ListenHandle *lh;
931
932   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
933   lh->listen_cb = listen_cb;
934   lh->listen_cls = listen_cls;
935   lh->cfg = cfg;
936   lh->operation = operation;
937   lh->app_id = *app_id;
938   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
939   listen_connect (lh, NULL);
940   if (NULL == lh->client)
941   {
942     GNUNET_free (lh);
943     return NULL;
944   }
945   return lh;
946 }
947
948
949 /**
950  * Cancel the given listen operation.
951  *
952  * @param lh handle for the listen operation
953  */
954 void
955 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
956 {
957   LOG (GNUNET_ERROR_TYPE_DEBUG,
958        "Canceling listener\n");
959   if (NULL != lh->mq)
960   {
961     GNUNET_MQ_destroy (lh->mq);
962     lh->mq = NULL;
963   }
964   if (NULL != lh->client)
965   {
966     GNUNET_CLIENT_disconnect (lh->client);
967     lh->client = NULL;
968   }
969   if (NULL != lh->reconnect_task)
970   {
971     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
972     lh->reconnect_task = NULL;
973   }
974   GNUNET_free (lh);
975 }
976
977
978 /**
979  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
980  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
981  * afterwards.
982  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
983  * and to begin the exchange with the remote peer.
984  *
985  * @param request request to accept
986  * @param result_mode specified how results will be returned,
987  *        see `enum GNUNET_SET_ResultMode`.
988  * @param result_cb callback for the results
989  * @param result_cls closure for @a result_cb
990  * @return a handle to cancel the operation
991  */
992 struct GNUNET_SET_OperationHandle *
993 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
994                    enum GNUNET_SET_ResultMode result_mode,
995                    GNUNET_SET_ResultIterator result_cb,
996                    void *result_cls)
997 {
998   struct GNUNET_MQ_Envelope *mqm;
999   struct GNUNET_SET_OperationHandle *oh;
1000   struct GNUNET_SET_AcceptMessage *msg;
1001
1002   GNUNET_assert (GNUNET_NO == request->accepted);
1003   request->accepted = GNUNET_YES;
1004   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1005   msg->accept_reject_id = htonl (request->accept_id);
1006   msg->result_mode = htonl (result_mode);
1007   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1008   oh->result_cb = result_cb;
1009   oh->result_cls = result_cls;
1010   oh->conclude_mqm = mqm;
1011   oh->request_id_addr = &msg->request_id;
1012   return oh;
1013 }
1014
1015
1016 /**
1017  * Commit a set to be used with a set operation.
1018  * This function is called once we have fully constructed
1019  * the set that we want to use for the operation.  At this
1020  * time, the P2P protocol can then begin to exchange the
1021  * set information and call the result callback with the
1022  * result information.
1023  *
1024  * @param oh handle to the set operation
1025  * @param set the set to use for the operation
1026  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1027  *         set is invalid (e.g. the set service crashed)
1028  */
1029 int
1030 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1031                    struct GNUNET_SET_Handle *set)
1032 {
1033   GNUNET_assert (NULL == oh->set);
1034   if (GNUNET_YES == set->invalid)
1035     return GNUNET_SYSERR;
1036   GNUNET_assert (NULL != oh->conclude_mqm);
1037   oh->set = set;
1038   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1039                                set->ops_tail,
1040                                oh);
1041   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
1042   *oh->request_id_addr = htonl (oh->request_id);
1043   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
1044   oh->conclude_mqm = NULL;
1045   oh->request_id_addr = NULL;
1046   return GNUNET_OK;
1047 }
1048
1049
1050 /**
1051  * Iterate over all elements in the given set.  Note that this
1052  * operation involves transferring every element of the set from the
1053  * service to the client, and is thus costly.
1054  *
1055  * @param set the set to iterate over
1056  * @param iter the iterator to call for each element
1057  * @param iter_cls closure for @a iter
1058  * @return #GNUNET_YES if the iteration started successfuly,
1059  *         #GNUNET_NO if another iteration is active
1060  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1061  */
1062 int
1063 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1064                     GNUNET_SET_ElementIterator iter,
1065                     void *iter_cls)
1066 {
1067   struct GNUNET_MQ_Envelope *ev;
1068
1069   GNUNET_assert (NULL != iter);
1070   if (GNUNET_YES == set->invalid)
1071     return GNUNET_SYSERR;
1072   if (NULL != set->iterator)
1073     return GNUNET_NO;
1074   LOG (GNUNET_ERROR_TYPE_DEBUG,
1075        "Iterating over set\n");
1076   set->iterator = iter;
1077   set->iterator_cls = iter_cls;
1078   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1079   GNUNET_MQ_send (set->mq, ev);
1080   return GNUNET_YES;
1081 }
1082
1083
1084 void
1085 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1086                       GNUNET_SET_CopyReadyCallback cb,
1087                       void *cls)
1088 {
1089   struct GNUNET_MQ_Envelope *ev;
1090   struct SetCopyRequest *req;
1091
1092   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1093   GNUNET_MQ_send (set->mq, ev);
1094
1095   req = GNUNET_new (struct SetCopyRequest);
1096   req->cb = cb;
1097   req->cls = cls;
1098   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1099                                set->copy_req_tail,
1100                                req);
1101 }
1102
1103
1104 /* end of set_api.c */