use c99
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_set_service.h"
30 #include "set.h"
31
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
34
35 struct SetCopyRequest
36 {
37   struct SetCopyRequest *next;
38
39   struct SetCopyRequest *prev;
40
41   void *cls;
42
43   GNUNET_SET_CopyReadyCallback cb;
44 };
45
46 /**
47  * Opaque handle to a set.
48  */
49 struct GNUNET_SET_Handle
50 {
51   /**
52    * Message queue for @e client.
53    */
54   struct GNUNET_MQ_Handle *mq;
55
56   /**
57    * Linked list of operations on the set.
58    */
59   struct GNUNET_SET_OperationHandle *ops_head;
60
61   /**
62    * Linked list of operations on the set.
63    */
64   struct GNUNET_SET_OperationHandle *ops_tail;
65
66   /**
67    * Callback for the current iteration over the set,
68    * NULL if no iterator is active.
69    */
70   GNUNET_SET_ElementIterator iterator;
71
72   /**
73    * Closure for @e iterator
74    */
75   void *iterator_cls;
76
77   /**
78    * Should the set be destroyed once all operations are gone?
79    */
80   int destroy_requested;
81
82   /**
83    * Has the set become invalid (e.g. service died)?
84    */
85   int invalid;
86
87   /**
88    * Both client and service count the number of iterators
89    * created so far to match replies with iterators.
90    */
91   uint16_t iteration_id;
92
93   /**
94    * Configuration, needed when creating (lazy) copies.
95    */
96   const struct GNUNET_CONFIGURATION_Handle *cfg;
97
98   /**
99    * Doubly linked list of copy requests.
100    */
101   struct SetCopyRequest *copy_req_head;
102
103   /**
104    * Doubly linked list of copy requests.
105    */
106   struct SetCopyRequest *copy_req_tail;
107 };
108
109
110 /**
111  * Handle for a set operation request from another peer.
112  */
113 struct GNUNET_SET_Request
114 {
115   /**
116    * Id of the request, used to identify the request when
117    * accepting/rejecting it.
118    */
119   uint32_t accept_id;
120
121   /**
122    * Has the request been accepted already?
123    * #GNUNET_YES/#GNUNET_NO
124    */
125   int accepted;
126 };
127
128
129 /**
130  * Handle to an operation.  Only known to the service after committing
131  * the handle with a set.
132  */
133 struct GNUNET_SET_OperationHandle
134 {
135   /**
136    * Function to be called when we have a result,
137    * or an error.
138    */
139   GNUNET_SET_ResultIterator result_cb;
140
141   /**
142    * Closure for @e result_cb.
143    */
144   void *result_cls;
145
146   /**
147    * Local set used for the operation,
148    * NULL if no set has been provided by conclude yet.
149    */
150   struct GNUNET_SET_Handle *set;
151
152   /**
153    * Message sent to the server on calling conclude,
154    * NULL if conclude has been called.
155    */
156   struct GNUNET_MQ_Envelope *conclude_mqm;
157
158   /**
159    * Address of the request if in the conclude message,
160    * used to patch the request id into the message when the set is known.
161    */
162   uint32_t *request_id_addr;
163
164   /**
165    * Handles are kept in a linked list.
166    */
167   struct GNUNET_SET_OperationHandle *prev;
168
169   /**
170    * Handles are kept in a linked list.
171    */
172   struct GNUNET_SET_OperationHandle *next;
173
174   /**
175    * Request ID to identify the operation within the set.
176    */
177   uint32_t request_id;
178 };
179
180
181 /**
182  * Opaque handle to a listen operation.
183  */
184 struct GNUNET_SET_ListenHandle
185 {
186
187   /**
188    * Message queue for the client.
189    */
190   struct GNUNET_MQ_Handle* mq;
191
192   /**
193    * Configuration handle for the listener, stored
194    * here to be able to reconnect transparently on
195    * connection failure.
196    */
197   const struct GNUNET_CONFIGURATION_Handle *cfg;
198
199   /**
200    * Function to call on a new incoming request,
201    * or on error.
202    */
203   GNUNET_SET_ListenCallback listen_cb;
204
205   /**
206    * Closure for @e listen_cb.
207    */
208   void *listen_cls;
209
210   /**
211    * Application ID we listen for.
212    */
213   struct GNUNET_HashCode app_id;
214
215   /**
216    * Time to wait until we try to reconnect on failure.
217    */
218   struct GNUNET_TIME_Relative reconnect_backoff;
219
220   /**
221    * Task for reconnecting when the listener fails.
222    */
223   struct GNUNET_SCHEDULER_Task *reconnect_task;
224
225   /**
226    * Operation we listen for.
227    */
228   enum GNUNET_SET_OperationType operation;
229 };
230
231
232 /* mutual recursion with handle_copy_lazy */
233 static struct GNUNET_SET_Handle *
234 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
235                  enum GNUNET_SET_OperationType op,
236                  const uint32_t *cookie);
237
238
239 /**
240  * Handle element for iteration over the set.  Notifies the
241  * iterator and sends an acknowledgement to the service.
242  *
243  * @param cls the `struct GNUNET_SET_Handle *`
244  * @param msg the message
245  */
246 static void
247 handle_copy_lazy (void *cls,
248                   const struct GNUNET_SET_CopyLazyResponseMessage *msg)
249 {
250   struct GNUNET_SET_Handle *set = cls;
251   struct SetCopyRequest *req;
252   struct GNUNET_SET_Handle *new_set;
253
254   req = set->copy_req_head;
255   if (NULL == req)
256   {
257     /* Service sent us unsolicited lazy copy response */
258     GNUNET_break (0);
259     return;
260   }
261
262   LOG (GNUNET_ERROR_TYPE_DEBUG,
263        "Handling response to lazy copy\n");
264   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
265                                set->copy_req_tail,
266                                req);
267   // We pass none as operation here, since it doesn't matter when
268   // cloning.
269   new_set = create_internal (set->cfg,
270                              GNUNET_SET_OPERATION_NONE,
271                              &msg->cookie);
272   req->cb (req->cls, new_set);
273   GNUNET_free (req);
274 }
275
276
277 /**
278  * Check that the given @a msg is well-formed.
279  *
280  * @param cls closure
281  * @param msg message to check
282  * @return #GNUNET_OK if message is well-formed
283  */
284 static int
285 check_iter_element (void *cls,
286                     const struct GNUNET_SET_IterResponseMessage *msg)
287 {
288   /* minimum size was already checked, everything else is OK! */
289   return GNUNET_OK;
290 }
291
292
293 /**
294  * Handle element for iteration over the set.  Notifies the
295  * iterator and sends an acknowledgement to the service.
296  *
297  * @param cls the `struct GNUNET_SET_Handle *`
298  * @param mh the message
299  */
300  static void
301  handle_iter_element (void *cls,
302                       const struct GNUNET_SET_IterResponseMessage *msg)
303 {
304   struct GNUNET_SET_Handle *set = cls;
305   GNUNET_SET_ElementIterator iter = set->iterator;
306   struct GNUNET_SET_Element element;
307   struct GNUNET_SET_IterAckMessage *ack_msg;
308   struct GNUNET_MQ_Envelope *ev;
309   uint16_t msize;
310
311   msize = ntohs (msg->header.size);
312   if (set->iteration_id != ntohs (msg->iteration_id))
313   {
314     /* element from a previous iteration, skip! */
315     iter = NULL;
316   }
317   if (NULL != iter)
318   {
319     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
320     element.element_type = ntohs (msg->element_type);
321     element.data = &msg[1];
322     iter (set->iterator_cls,
323           &element);
324   }
325   ev = GNUNET_MQ_msg (ack_msg,
326                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
327   ack_msg->send_more = htonl ((NULL != iter));
328   GNUNET_MQ_send (set->mq, ev);
329 }
330
331
332 /**
333  * Handle message signalling conclusion of iteration over the set.
334  * Notifies the iterator that we are done.
335  *
336  * @param cls the set
337  * @param mh the message
338  */
339 static void
340 handle_iter_done (void *cls,
341                   const struct GNUNET_MessageHeader *mh)
342 {
343   struct GNUNET_SET_Handle *set = cls;
344   GNUNET_SET_ElementIterator iter = set->iterator;
345
346   if (NULL == iter)
347     return;
348   set->iterator = NULL;
349   set->iteration_id++;
350   iter (set->iterator_cls,
351         NULL);
352 }
353
354
355 /**
356  * Check that the given @a msg is well-formed.
357  *
358  * @param cls closure
359  * @param msg message to check
360  * @return #GNUNET_OK if message is well-formed
361  */
362 static int
363 check_result (void *cls,
364               const struct GNUNET_SET_ResultMessage *msg)
365 {
366   /* minimum size was already checked, everything else is OK! */
367   return GNUNET_OK;
368 }
369
370
371 /**
372  * Handle result message for a set operation.
373  *
374  * @param cls the set
375  * @param mh the message
376  */
377 static void
378 handle_result (void *cls,
379                const struct GNUNET_SET_ResultMessage *msg)
380 {
381   struct GNUNET_SET_Handle *set = cls;
382   struct GNUNET_SET_OperationHandle *oh;
383   struct GNUNET_SET_Element e;
384   enum GNUNET_SET_Status result_status;
385
386   GNUNET_assert (NULL != set->mq);
387   result_status = ntohs (msg->result_status);
388   LOG (GNUNET_ERROR_TYPE_DEBUG,
389        "Got result message with status %d\n",
390        result_status);
391
392   oh = GNUNET_MQ_assoc_get (set->mq,
393                             ntohl (msg->request_id));
394   if (NULL == oh)
395   {
396     /* 'oh' can be NULL if we canceled the operation, but the service
397        did not get the cancel message yet. */
398     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399                 "Ignoring result from canceled operation\n");
400     return;
401   }
402
403   switch (result_status)
404   {
405     case GNUNET_SET_STATUS_OK:
406     case GNUNET_SET_STATUS_ADD_LOCAL:
407     case GNUNET_SET_STATUS_ADD_REMOTE:
408       goto do_element;
409     case GNUNET_SET_STATUS_FAILURE:
410     case GNUNET_SET_STATUS_DONE:
411       goto do_final;
412     case GNUNET_SET_STATUS_HALF_DONE:
413       /* not used anymore */
414       GNUNET_assert (0);
415   }
416
417 do_final:
418   LOG (GNUNET_ERROR_TYPE_DEBUG,
419        "Treating result as final status\n");
420   GNUNET_MQ_assoc_remove (set->mq,
421                           ntohl (msg->request_id));
422   GNUNET_CONTAINER_DLL_remove (set->ops_head,
423                                set->ops_tail,
424                                oh);
425   if (NULL != oh->result_cb)
426   {
427     oh->result_cb (oh->result_cls,
428                    NULL,
429                    result_status);
430   }
431   else
432   {
433     LOG (GNUNET_ERROR_TYPE_DEBUG,
434          "No callback for final status\n");
435   }
436   if ( (GNUNET_YES == set->destroy_requested) &&
437        (NULL == set->ops_head) )
438     GNUNET_SET_destroy (set);
439   GNUNET_free (oh);
440   return;
441
442 do_element:
443   LOG (GNUNET_ERROR_TYPE_DEBUG,
444        "Treating result as element\n");
445   e.data = &msg[1];
446   e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
447   e.element_type = ntohs (msg->element_type);
448   if (NULL != oh->result_cb)
449     oh->result_cb (oh->result_cls,
450                    &e,
451                    result_status);
452 }
453
454
455 /**
456  * Destroy the given set operation.
457  *
458  * @param oh set operation to destroy
459  */
460 static void
461 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
462 {
463   struct GNUNET_SET_Handle *set = oh->set;
464   struct GNUNET_SET_OperationHandle *h_assoc;
465
466   if (NULL != oh->conclude_mqm)
467     GNUNET_MQ_discard (oh->conclude_mqm);
468   /* is the operation already commited? */
469   if (NULL != set)
470   {
471     GNUNET_CONTAINER_DLL_remove (set->ops_head,
472                                  set->ops_tail,
473                                  oh);
474     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
475                                       oh->request_id);
476     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
477   }
478   GNUNET_free (oh);
479 }
480
481
482 /**
483  * Cancel the given set operation.  We need to send an explicit cancel
484  * message, as all operations one one set communicate using one
485  * handle.
486  *
487  * @param oh set operation to cancel
488  */
489 void
490 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
491 {
492   struct GNUNET_SET_Handle *set = oh->set;
493   struct GNUNET_SET_CancelMessage *m;
494   struct GNUNET_MQ_Envelope *mqm;
495
496   if (NULL != set)
497   {
498     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
499     m->request_id = htonl (oh->request_id);
500     GNUNET_MQ_send (set->mq, mqm);
501   }
502   set_operation_destroy (oh);
503   if ( (NULL != set) &&
504        (GNUNET_YES == set->destroy_requested) &&
505        (NULL == set->ops_head) )
506   {
507     LOG (GNUNET_ERROR_TYPE_DEBUG,
508          "Destroying set after operation cancel\n");
509     GNUNET_SET_destroy (set);
510   }
511 }
512
513
514 /**
515  * We encountered an error communicating with the set service while
516  * performing a set operation. Report to the application.
517  *
518  * @param cls the `struct GNUNET_SET_Handle`
519  * @param error error code
520  */
521 static void
522 handle_client_set_error (void *cls,
523                          enum GNUNET_MQ_Error error)
524 {
525   struct GNUNET_SET_Handle *set = cls;
526   GNUNET_SET_ElementIterator iter = set->iterator;
527
528   LOG (GNUNET_ERROR_TYPE_DEBUG,
529        "Handling client set error %d\n",
530        error);
531   while (NULL != set->ops_head)
532   {
533     if (NULL != set->ops_head->result_cb)
534       set->ops_head->result_cb (set->ops_head->result_cls,
535                                 NULL,
536                                 GNUNET_SET_STATUS_FAILURE);
537     set_operation_destroy (set->ops_head);
538   }
539   set->iterator = NULL;
540   set->iteration_id++;
541   if (NULL != iter)
542     iter (set->iterator_cls,
543           NULL);
544   set->invalid = GNUNET_YES;
545   if (GNUNET_YES == set->destroy_requested)
546   {
547     LOG (GNUNET_ERROR_TYPE_DEBUG,
548          "Destroying set after operation failure\n");
549     GNUNET_SET_destroy (set);
550   }
551 }
552
553
554 static struct GNUNET_SET_Handle *
555 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
556                  enum GNUNET_SET_OperationType op,
557                  const uint32_t *cookie)
558 {
559   GNUNET_MQ_hd_var_size (result,
560                          GNUNET_MESSAGE_TYPE_SET_RESULT,
561                          struct GNUNET_SET_ResultMessage);
562   GNUNET_MQ_hd_var_size (iter_element,
563                          GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
564                          struct GNUNET_SET_IterResponseMessage);
565   GNUNET_MQ_hd_fixed_size (iter_done,
566                            GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
567                            struct GNUNET_MessageHeader);
568   GNUNET_MQ_hd_fixed_size (copy_lazy,
569                            GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
570                            struct GNUNET_SET_CopyLazyResponseMessage);
571   struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
572   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
573     make_result_handler (set),
574     make_iter_element_handler (set),
575     make_iter_done_handler (set),
576     make_copy_lazy_handler (set),
577     GNUNET_MQ_handler_end ()
578   };
579   struct GNUNET_MQ_Envelope *mqm;
580   struct GNUNET_SET_CreateMessage *create_msg;
581   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
582
583   set->cfg = cfg;
584   set->mq = GNUNET_CLIENT_connecT (cfg,
585                                    "set",
586                                    mq_handlers,
587                                    &handle_client_set_error,
588                                    set);
589   if (NULL == set->mq)
590   {
591     GNUNET_free (set);
592     return NULL;
593   }
594   if (NULL == cookie)
595   {
596     LOG (GNUNET_ERROR_TYPE_DEBUG,
597          "Creating new set (operation %u)\n",
598          op);
599     mqm = GNUNET_MQ_msg (create_msg,
600                          GNUNET_MESSAGE_TYPE_SET_CREATE);
601     create_msg->operation = htonl (op);
602   }
603   else
604   {
605     LOG (GNUNET_ERROR_TYPE_DEBUG,
606          "Creating new set (lazy copy)\n",
607          op);
608     mqm = GNUNET_MQ_msg (copy_msg,
609                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
610     copy_msg->cookie = *cookie;
611   }
612   GNUNET_MQ_send (set->mq, mqm);
613   return set;
614 }
615
616
617 /**
618  * Create an empty set, supporting the specified operation.
619  *
620  * @param cfg configuration to use for connecting to the
621  *        set service
622  * @param op operation supported by the set
623  *        Note that the operation has to be specified
624  *        beforehand, as certain set operations need to maintain
625  *        data structures spefific to the operation
626  * @return a handle to the set
627  */
628 struct GNUNET_SET_Handle *
629 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
630                    enum GNUNET_SET_OperationType op)
631 {
632   return create_internal (cfg, op, NULL);
633 }
634
635
636 /**
637  * Add an element to the given set.  After the element has been added
638  * (in the sense of being transmitted to the set service), @a cont
639  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
640  * queued.
641  *
642  * @param set set to add element to
643  * @param element element to add to the set
644  * @param cont continuation called after the element has been added
645  * @param cont_cls closure for @a cont
646  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
647  *         set is invalid (e.g. the set service crashed)
648  */
649 int
650 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
651                         const struct GNUNET_SET_Element *element,
652                         GNUNET_SET_Continuation cont,
653                         void *cont_cls)
654 {
655   struct GNUNET_MQ_Envelope *mqm;
656   struct GNUNET_SET_ElementMessage *msg;
657
658   if (GNUNET_YES == set->invalid)
659   {
660     if (NULL != cont)
661       cont (cont_cls);
662     return GNUNET_SYSERR;
663   }
664   mqm = GNUNET_MQ_msg_extra (msg, element->size,
665                              GNUNET_MESSAGE_TYPE_SET_ADD);
666   msg->element_type = htons (element->element_type);
667   memcpy (&msg[1],
668           element->data,
669           element->size);
670   GNUNET_MQ_notify_sent (mqm,
671                          cont, cont_cls);
672   GNUNET_MQ_send (set->mq, mqm);
673   return GNUNET_OK;
674 }
675
676
677 /**
678  * Remove an element to the given set.  After the element has been
679  * removed (in the sense of the request being transmitted to the set
680  * service), @a cont will be called.  Multiple calls to
681  * GNUNET_SET_remove_element() can be queued
682  *
683  * @param set set to remove element from
684  * @param element element to remove from the set
685  * @param cont continuation called after the element has been removed
686  * @param cont_cls closure for @a cont
687  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
688  *         set is invalid (e.g. the set service crashed)
689  */
690 int
691 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
692                            const struct GNUNET_SET_Element *element,
693                            GNUNET_SET_Continuation cont,
694                            void *cont_cls)
695 {
696   struct GNUNET_MQ_Envelope *mqm;
697   struct GNUNET_SET_ElementMessage *msg;
698
699   if (GNUNET_YES == set->invalid)
700   {
701     if (NULL != cont)
702       cont (cont_cls);
703     return GNUNET_SYSERR;
704   }
705   mqm = GNUNET_MQ_msg_extra (msg,
706                              element->size,
707                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
708   msg->element_type = htons (element->element_type);
709   memcpy (&msg[1],
710           element->data,
711           element->size);
712   GNUNET_MQ_notify_sent (mqm,
713                          cont, cont_cls);
714   GNUNET_MQ_send (set->mq, mqm);
715   return GNUNET_OK;
716 }
717
718
719 /**
720  * Destroy the set handle if no operations are left, mark the set
721  * for destruction otherwise.
722  *
723  * @param set set handle to destroy
724  */
725 void
726 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
727 {
728   /* destroying set while iterator is active is currently
729      not supported; we should expand the API to allow
730      clients to explicitly cancel the iteration! */
731   GNUNET_assert (NULL == set->iterator);
732   if (NULL != set->ops_head)
733   {
734     LOG (GNUNET_ERROR_TYPE_DEBUG,
735          "Set operations are pending, delaying set destruction\n");
736     set->destroy_requested = GNUNET_YES;
737     return;
738   }
739   LOG (GNUNET_ERROR_TYPE_DEBUG,
740        "Really destroying set\n");
741   if (NULL != set->mq)
742   {
743     GNUNET_MQ_destroy (set->mq);
744     set->mq = NULL;
745   }
746   GNUNET_free (set);
747 }
748
749
750 /**
751  * Prepare a set operation to be evaluated with another peer.
752  * The evaluation will not start until the client provides
753  * a local set with #GNUNET_SET_commit().
754  *
755  * @param other_peer peer with the other set
756  * @param app_id hash for the application using the set
757  * @param context_msg additional information for the request
758  * @param result_mode specified how results will be returned,
759  *        see `enum GNUNET_SET_ResultMode`.
760  * @param result_cb called on error or success
761  * @param result_cls closure for @e result_cb
762  * @return a handle to cancel the operation
763  */
764 struct GNUNET_SET_OperationHandle *
765 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
766                     const struct GNUNET_HashCode *app_id,
767                     const struct GNUNET_MessageHeader *context_msg,
768                     enum GNUNET_SET_ResultMode result_mode,
769                     GNUNET_SET_ResultIterator result_cb,
770                     void *result_cls)
771 {
772   struct GNUNET_MQ_Envelope *mqm;
773   struct GNUNET_SET_OperationHandle *oh;
774   struct GNUNET_SET_EvaluateMessage *msg;
775
776   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
777   oh->result_cb = result_cb;
778   oh->result_cls = result_cls;
779   mqm = GNUNET_MQ_msg_nested_mh (msg,
780                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
781                                  context_msg);
782   msg->app_id = *app_id;
783   msg->result_mode = htonl (result_mode);
784   msg->target_peer = *other_peer;
785   oh->conclude_mqm = mqm;
786   oh->request_id_addr = &msg->request_id;
787
788   return oh;
789 }
790
791
792 /**
793  * Connect to the set service in order to listen for requests.
794  *
795  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
796  */
797 static void
798 listen_connect (void *cls);
799
800
801 /**
802  * Check validity of request message for a listen operation
803  *
804  * @param cls the listen handle
805  * @param msg the message
806  * @return #GNUNET_OK if the message is well-formed
807  */
808 static int
809 check_request (void *cls,
810                const struct GNUNET_SET_RequestMessage *msg)
811 {
812   const struct GNUNET_MessageHeader *context_msg;
813
814   context_msg = GNUNET_MQ_extract_nested_mh (msg);
815   if (NULL == context_msg)
816   {
817     GNUNET_break_op (0);
818     return GNUNET_SYSERR;
819   }
820   return GNUNET_OK;
821 }
822
823
824 /**
825  * Handle request message for a listen operation
826  *
827  * @param cls the listen handle
828  * @param msg the message
829  */
830 static void
831 handle_request (void *cls,
832                 const struct GNUNET_SET_RequestMessage *msg)
833 {
834   struct GNUNET_SET_ListenHandle *lh = cls;
835   struct GNUNET_SET_Request req;
836   const struct GNUNET_MessageHeader *context_msg;
837   struct GNUNET_MQ_Envelope *mqm;
838   struct GNUNET_SET_RejectMessage *rmsg;
839
840   LOG (GNUNET_ERROR_TYPE_DEBUG,
841        "Processing incoming operation request\n");
842   /* we got another valid request => reset the backoff */
843   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
844   req.accept_id = ntohl (msg->accept_id);
845   req.accepted = GNUNET_NO;
846   context_msg = GNUNET_MQ_extract_nested_mh (msg);
847   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
848   lh->listen_cb (lh->listen_cls,
849                  &msg->peer_id,
850                  context_msg,
851                  &req);
852   if (GNUNET_YES == req.accepted)
853     return; /* the accept-case is handled in #GNUNET_SET_accept() */
854   LOG (GNUNET_ERROR_TYPE_DEBUG,
855        "Rejecting request\n");
856   mqm = GNUNET_MQ_msg (rmsg,
857                        GNUNET_MESSAGE_TYPE_SET_REJECT);
858   rmsg->accept_reject_id = msg->accept_id;
859   GNUNET_MQ_send (lh->mq, mqm);
860 }
861
862
863 /**
864  * Our connection with the set service encountered an error,
865  * re-initialize with exponential back-off.
866  *
867  * @param cls the `struct GNUNET_SET_ListenHandle *`
868  * @param error reason for the disconnect
869  */
870 static void
871 handle_client_listener_error (void *cls,
872                               enum GNUNET_MQ_Error error)
873 {
874   struct GNUNET_SET_ListenHandle *lh = cls;
875
876   LOG (GNUNET_ERROR_TYPE_DEBUG,
877        "Listener broke down (%d), re-connecting\n",
878        (int) error);
879   GNUNET_MQ_destroy (lh->mq);
880   lh->mq = NULL;
881   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
882                                                      &listen_connect,
883                                                      lh);
884   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
885 }
886
887
888 /**
889  * Connect to the set service in order to listen for requests.
890  *
891  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
892  */
893 static void
894 listen_connect (void *cls)
895 {
896   GNUNET_MQ_hd_var_size (request,
897                          GNUNET_MESSAGE_TYPE_SET_REQUEST,
898                          struct GNUNET_SET_RequestMessage);
899   struct GNUNET_SET_ListenHandle *lh = cls;
900   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
901     make_request_handler (lh),
902     GNUNET_MQ_handler_end ()
903   };
904   struct GNUNET_MQ_Envelope *mqm;
905   struct GNUNET_SET_ListenMessage *msg;
906
907   lh->reconnect_task = NULL;
908   GNUNET_assert (NULL == lh->mq);
909   lh->mq = GNUNET_CLIENT_connecT (lh->cfg,
910                                   "set",
911                                   mq_handlers,
912                                   &handle_client_listener_error,
913                                   lh);
914   if (NULL == lh->mq)
915     return;
916   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
917   msg->operation = htonl (lh->operation);
918   msg->app_id = lh->app_id;
919   GNUNET_MQ_send (lh->mq,
920                   mqm);
921 }
922
923
924 /**
925  * Wait for set operation requests for the given application id
926  *
927  * @param cfg configuration to use for connecting to
928  *            the set service, needs to be valid for the lifetime of the listen handle
929  * @param operation operation we want to listen for
930  * @param app_id id of the application that handles set operation requests
931  * @param listen_cb called for each incoming request matching the operation
932  *                  and application id
933  * @param listen_cls handle for @a listen_cb
934  * @return a handle that can be used to cancel the listen operation
935  */
936 struct GNUNET_SET_ListenHandle *
937 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
938                    enum GNUNET_SET_OperationType operation,
939                    const struct GNUNET_HashCode *app_id,
940                    GNUNET_SET_ListenCallback listen_cb,
941                    void *listen_cls)
942 {
943   struct GNUNET_SET_ListenHandle *lh;
944
945   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
946   lh->listen_cb = listen_cb;
947   lh->listen_cls = listen_cls;
948   lh->cfg = cfg;
949   lh->operation = operation;
950   lh->app_id = *app_id;
951   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
952   listen_connect (lh);
953   if (NULL == lh->mq)
954   {
955     GNUNET_free (lh);
956     return NULL;
957   }
958   return lh;
959 }
960
961
962 /**
963  * Cancel the given listen operation.
964  *
965  * @param lh handle for the listen operation
966  */
967 void
968 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
969 {
970   LOG (GNUNET_ERROR_TYPE_DEBUG,
971        "Canceling listener\n");
972   if (NULL != lh->mq)
973   {
974     GNUNET_MQ_destroy (lh->mq);
975     lh->mq = NULL;
976   }
977   if (NULL != lh->reconnect_task)
978   {
979     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
980     lh->reconnect_task = NULL;
981   }
982   GNUNET_free (lh);
983 }
984
985
986 /**
987  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
988  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
989  * afterwards.
990  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
991  * and to begin the exchange with the remote peer.
992  *
993  * @param request request to accept
994  * @param result_mode specified how results will be returned,
995  *        see `enum GNUNET_SET_ResultMode`.
996  * @param result_cb callback for the results
997  * @param result_cls closure for @a result_cb
998  * @return a handle to cancel the operation
999  */
1000 struct GNUNET_SET_OperationHandle *
1001 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1002                    enum GNUNET_SET_ResultMode result_mode,
1003                    GNUNET_SET_ResultIterator result_cb,
1004                    void *result_cls)
1005 {
1006   struct GNUNET_MQ_Envelope *mqm;
1007   struct GNUNET_SET_OperationHandle *oh;
1008   struct GNUNET_SET_AcceptMessage *msg;
1009
1010   GNUNET_assert (GNUNET_NO == request->accepted);
1011   request->accepted = GNUNET_YES;
1012   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1013   msg->accept_reject_id = htonl (request->accept_id);
1014   msg->result_mode = htonl (result_mode);
1015   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1016   oh->result_cb = result_cb;
1017   oh->result_cls = result_cls;
1018   oh->conclude_mqm = mqm;
1019   oh->request_id_addr = &msg->request_id;
1020   return oh;
1021 }
1022
1023
1024 /**
1025  * Commit a set to be used with a set operation.
1026  * This function is called once we have fully constructed
1027  * the set that we want to use for the operation.  At this
1028  * time, the P2P protocol can then begin to exchange the
1029  * set information and call the result callback with the
1030  * result information.
1031  *
1032  * @param oh handle to the set operation
1033  * @param set the set to use for the operation
1034  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1035  *         set is invalid (e.g. the set service crashed)
1036  */
1037 int
1038 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1039                    struct GNUNET_SET_Handle *set)
1040 {
1041   if (NULL != oh->set)
1042   {
1043     /* Some other set was already commited for this
1044      * operation, there is a logic bug in the client of this API */
1045     GNUNET_break (0);
1046     return GNUNET_OK;
1047   }
1048   if (GNUNET_YES == set->invalid)
1049     return GNUNET_SYSERR;
1050   GNUNET_assert (NULL != oh->conclude_mqm);
1051   oh->set = set;
1052   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1053                                set->ops_tail,
1054                                oh);
1055   oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
1056   *oh->request_id_addr = htonl (oh->request_id);
1057   GNUNET_MQ_send (set->mq, oh->conclude_mqm);
1058   oh->conclude_mqm = NULL;
1059   oh->request_id_addr = NULL;
1060   return GNUNET_OK;
1061 }
1062
1063
1064 /**
1065  * Iterate over all elements in the given set.  Note that this
1066  * operation involves transferring every element of the set from the
1067  * service to the client, and is thus costly.
1068  *
1069  * @param set the set to iterate over
1070  * @param iter the iterator to call for each element
1071  * @param iter_cls closure for @a iter
1072  * @return #GNUNET_YES if the iteration started successfuly,
1073  *         #GNUNET_NO if another iteration is active
1074  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1075  */
1076 int
1077 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1078                     GNUNET_SET_ElementIterator iter,
1079                     void *iter_cls)
1080 {
1081   struct GNUNET_MQ_Envelope *ev;
1082
1083   GNUNET_assert (NULL != iter);
1084   if (GNUNET_YES == set->invalid)
1085     return GNUNET_SYSERR;
1086   if (NULL != set->iterator)
1087     return GNUNET_NO;
1088   LOG (GNUNET_ERROR_TYPE_DEBUG,
1089        "Iterating over set\n");
1090   set->iterator = iter;
1091   set->iterator_cls = iter_cls;
1092   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1093   GNUNET_MQ_send (set->mq, ev);
1094   return GNUNET_YES;
1095 }
1096
1097
1098 void
1099 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1100                       GNUNET_SET_CopyReadyCallback cb,
1101                       void *cls)
1102 {
1103   struct GNUNET_MQ_Envelope *ev;
1104   struct SetCopyRequest *req;
1105
1106   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1107   GNUNET_MQ_send (set->mq, ev);
1108
1109   req = GNUNET_new (struct SetCopyRequest);
1110   req->cb = cb;
1111   req->cls = cls;
1112   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1113                                set->copy_req_tail,
1114                                req);
1115 }
1116
1117
1118 /**
1119  * Create a copy of an element.  The copy
1120  * must be GNUNET_free-d by the caller.
1121  *
1122  * @param element the element to copy
1123  * @return the copied element
1124  */
1125 struct GNUNET_SET_Element *
1126 GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
1127 {
1128   struct GNUNET_SET_Element *copy;
1129
1130   copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
1131   copy->size = element->size;
1132   copy->element_type = element->element_type;
1133   copy->data = &copy[1];
1134   memcpy ((void *) copy->data, element->data, copy->size);
1135
1136   return copy;
1137 }
1138
1139
1140 /**
1141  * Hash a set element.
1142  *
1143  * @param element the element that should be hashed
1144  * @param[out] ret_hash a pointer to where the hash of @a element
1145  *        should be stored
1146  */
1147 void
1148 GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
1149                          struct GNUNET_HashCode *ret_hash)
1150 {
1151   struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
1152
1153   /* It's not guaranteed that the element data is always after the element header,
1154      so we need to hash the chunks separately. */
1155   GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
1156   GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
1157   GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
1158   GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
1159 }
1160
1161 /* end of set_api.c */