guix-env: some update.
[oweals/gnunet.git] / src / set / set_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/set_api.c
22  * @brief api for the set service
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_set_service.h"
30 #include "set.h"
31
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
34
35 struct SetCopyRequest
36 {
37   struct SetCopyRequest *next;
38
39   struct SetCopyRequest *prev;
40
41   void *cls;
42
43   GNUNET_SET_CopyReadyCallback cb;
44 };
45
46 /**
47  * Opaque handle to a set.
48  */
49 struct GNUNET_SET_Handle
50 {
51   /**
52    * Message queue for @e client.
53    */
54   struct GNUNET_MQ_Handle *mq;
55
56   /**
57    * Linked list of operations on the set.
58    */
59   struct GNUNET_SET_OperationHandle *ops_head;
60
61   /**
62    * Linked list of operations on the set.
63    */
64   struct GNUNET_SET_OperationHandle *ops_tail;
65
66   /**
67    * Callback for the current iteration over the set,
68    * NULL if no iterator is active.
69    */
70   GNUNET_SET_ElementIterator iterator;
71
72   /**
73    * Closure for @e iterator
74    */
75   void *iterator_cls;
76
77   /**
78    * Should the set be destroyed once all operations are gone?
79    * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
80    * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
81    */
82   int destroy_requested;
83
84   /**
85    * Has the set become invalid (e.g. service died)?
86    */
87   int invalid;
88
89   /**
90    * Both client and service count the number of iterators
91    * created so far to match replies with iterators.
92    */
93   uint16_t iteration_id;
94
95   /**
96    * Configuration, needed when creating (lazy) copies.
97    */
98   const struct GNUNET_CONFIGURATION_Handle *cfg;
99
100   /**
101    * Doubly linked list of copy requests.
102    */
103   struct SetCopyRequest *copy_req_head;
104
105   /**
106    * Doubly linked list of copy requests.
107    */
108   struct SetCopyRequest *copy_req_tail;
109 };
110
111
112 /**
113  * Handle for a set operation request from another peer.
114  */
115 struct GNUNET_SET_Request
116 {
117   /**
118    * Id of the request, used to identify the request when
119    * accepting/rejecting it.
120    */
121   uint32_t accept_id;
122
123   /**
124    * Has the request been accepted already?
125    * #GNUNET_YES/#GNUNET_NO
126    */
127   int accepted;
128 };
129
130
131 /**
132  * Handle to an operation.  Only known to the service after committing
133  * the handle with a set.
134  */
135 struct GNUNET_SET_OperationHandle
136 {
137   /**
138    * Function to be called when we have a result,
139    * or an error.
140    */
141   GNUNET_SET_ResultIterator result_cb;
142
143   /**
144    * Closure for @e result_cb.
145    */
146   void *result_cls;
147
148   /**
149    * Local set used for the operation,
150    * NULL if no set has been provided by conclude yet.
151    */
152   struct GNUNET_SET_Handle *set;
153
154   /**
155    * Message sent to the server on calling conclude,
156    * NULL if conclude has been called.
157    */
158   struct GNUNET_MQ_Envelope *conclude_mqm;
159
160   /**
161    * Address of the request if in the conclude message,
162    * used to patch the request id into the message when the set is known.
163    */
164   uint32_t *request_id_addr;
165
166   /**
167    * Handles are kept in a linked list.
168    */
169   struct GNUNET_SET_OperationHandle *prev;
170
171   /**
172    * Handles are kept in a linked list.
173    */
174   struct GNUNET_SET_OperationHandle *next;
175
176   /**
177    * Request ID to identify the operation within the set.
178    */
179   uint32_t request_id;
180 };
181
182
183 /**
184  * Opaque handle to a listen operation.
185  */
186 struct GNUNET_SET_ListenHandle
187 {
188
189   /**
190    * Message queue for the client.
191    */
192   struct GNUNET_MQ_Handle* mq;
193
194   /**
195    * Configuration handle for the listener, stored
196    * here to be able to reconnect transparently on
197    * connection failure.
198    */
199   const struct GNUNET_CONFIGURATION_Handle *cfg;
200
201   /**
202    * Function to call on a new incoming request,
203    * or on error.
204    */
205   GNUNET_SET_ListenCallback listen_cb;
206
207   /**
208    * Closure for @e listen_cb.
209    */
210   void *listen_cls;
211
212   /**
213    * Application ID we listen for.
214    */
215   struct GNUNET_HashCode app_id;
216
217   /**
218    * Time to wait until we try to reconnect on failure.
219    */
220   struct GNUNET_TIME_Relative reconnect_backoff;
221
222   /**
223    * Task for reconnecting when the listener fails.
224    */
225   struct GNUNET_SCHEDULER_Task *reconnect_task;
226
227   /**
228    * Operation we listen for.
229    */
230   enum GNUNET_SET_OperationType operation;
231 };
232
233
234 /* mutual recursion with handle_copy_lazy */
235 static struct GNUNET_SET_Handle *
236 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
237                  enum GNUNET_SET_OperationType op,
238                  const uint32_t *cookie);
239
240
241 /**
242  * Handle element for iteration over the set.  Notifies the
243  * iterator and sends an acknowledgement to the service.
244  *
245  * @param cls the `struct GNUNET_SET_Handle *`
246  * @param msg the message
247  */
248 static void
249 handle_copy_lazy (void *cls,
250                   const struct GNUNET_SET_CopyLazyResponseMessage *msg)
251 {
252   struct GNUNET_SET_Handle *set = cls;
253   struct SetCopyRequest *req;
254   struct GNUNET_SET_Handle *new_set;
255
256   req = set->copy_req_head;
257   if (NULL == req)
258   {
259     /* Service sent us unsolicited lazy copy response */
260     GNUNET_break (0);
261     return;
262   }
263
264   LOG (GNUNET_ERROR_TYPE_DEBUG,
265        "Handling response to lazy copy\n");
266   GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
267                                set->copy_req_tail,
268                                req);
269   // We pass none as operation here, since it doesn't matter when
270   // cloning.
271   new_set = create_internal (set->cfg,
272                              GNUNET_SET_OPERATION_NONE,
273                              &msg->cookie);
274   req->cb (req->cls, new_set);
275   GNUNET_free (req);
276 }
277
278
279 /**
280  * Check that the given @a msg is well-formed.
281  *
282  * @param cls closure
283  * @param msg message to check
284  * @return #GNUNET_OK if message is well-formed
285  */
286 static int
287 check_iter_element (void *cls,
288                     const struct GNUNET_SET_IterResponseMessage *msg)
289 {
290   /* minimum size was already checked, everything else is OK! */
291   return GNUNET_OK;
292 }
293
294
295 /**
296  * Handle element for iteration over the set.  Notifies the
297  * iterator and sends an acknowledgement to the service.
298  *
299  * @param cls the `struct GNUNET_SET_Handle *`
300  * @param mh the message
301  */
302 static void
303 handle_iter_element (void *cls,
304                      const struct GNUNET_SET_IterResponseMessage *msg)
305 {
306   struct GNUNET_SET_Handle *set = cls;
307   GNUNET_SET_ElementIterator iter = set->iterator;
308   struct GNUNET_SET_Element element;
309   struct GNUNET_SET_IterAckMessage *ack_msg;
310   struct GNUNET_MQ_Envelope *ev;
311   uint16_t msize;
312
313   LOG (GNUNET_ERROR_TYPE_DEBUG,
314        "Received element in set iteration\n");
315   msize = ntohs (msg->header.size);
316   if (set->iteration_id != ntohs (msg->iteration_id))
317   {
318     /* element from a previous iteration, skip! */
319     iter = NULL;
320   }
321   if (NULL != iter)
322   {
323     element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
324     element.element_type = ntohs (msg->element_type);
325     element.data = &msg[1];
326     iter (set->iterator_cls,
327           &element);
328   }
329   ev = GNUNET_MQ_msg (ack_msg,
330                       GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
331   ack_msg->send_more = htonl ((NULL != iter));
332   GNUNET_MQ_send (set->mq, ev);
333 }
334
335
336 /**
337  * Handle message signalling conclusion of iteration over the set.
338  * Notifies the iterator that we are done.
339  *
340  * @param cls the set
341  * @param mh the message
342  */
343 static void
344 handle_iter_done (void *cls,
345                   const struct GNUNET_MessageHeader *mh)
346 {
347   struct GNUNET_SET_Handle *set = cls;
348   GNUNET_SET_ElementIterator iter = set->iterator;
349
350   if (NULL == iter)
351   {
352     /* FIXME: if this is true, could cancel+start a fresh one
353        cause elements to go to the wrong iteration? */
354     LOG (GNUNET_ERROR_TYPE_INFO,
355          "Service completed set iteration that was already cancelled\n");
356     return;
357   }
358   LOG (GNUNET_ERROR_TYPE_DEBUG,
359        "Set iteration completed\n");
360   set->destroy_requested = GNUNET_SYSERR;
361   set->iterator = NULL;
362   set->iteration_id++;
363   iter (set->iterator_cls,
364         NULL);
365   if (GNUNET_SYSERR == set->destroy_requested)
366     set->destroy_requested = GNUNET_NO;
367   if (GNUNET_YES == set->destroy_requested)
368     GNUNET_SET_destroy (set);
369 }
370
371
372 /**
373  * Check that the given @a msg is well-formed.
374  *
375  * @param cls closure
376  * @param msg message to check
377  * @return #GNUNET_OK if message is well-formed
378  */
379 static int
380 check_result (void *cls,
381               const struct GNUNET_SET_ResultMessage *msg)
382 {
383   /* minimum size was already checked, everything else is OK! */
384   return GNUNET_OK;
385 }
386
387
388 /**
389  * Handle result message for a set operation.
390  *
391  * @param cls the set
392  * @param mh the message
393  */
394 static void
395 handle_result (void *cls,
396                const struct GNUNET_SET_ResultMessage *msg)
397 {
398   struct GNUNET_SET_Handle *set = cls;
399   struct GNUNET_SET_OperationHandle *oh;
400   struct GNUNET_SET_Element e;
401   enum GNUNET_SET_Status result_status;
402   int destroy_set;
403
404   GNUNET_assert (NULL != set->mq);
405   result_status = (enum GNUNET_SET_Status) ntohs (msg->result_status);
406   LOG (GNUNET_ERROR_TYPE_DEBUG,
407        "Got result message with status %d\n",
408        result_status);
409
410   oh = GNUNET_MQ_assoc_get (set->mq,
411                             ntohl (msg->request_id));
412   if (NULL == oh)
413   {
414     /* 'oh' can be NULL if we canceled the operation, but the service
415        did not get the cancel message yet. */
416     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417                 "Ignoring result from canceled operation\n");
418     return;
419   }
420
421   switch (result_status)
422   {
423     case GNUNET_SET_STATUS_OK:
424     case GNUNET_SET_STATUS_ADD_LOCAL:
425     case GNUNET_SET_STATUS_ADD_REMOTE:
426       goto do_element;
427     case GNUNET_SET_STATUS_FAILURE:
428     case GNUNET_SET_STATUS_DONE:
429       goto do_final;
430     case GNUNET_SET_STATUS_HALF_DONE:
431       /* not used anymore */
432       GNUNET_assert (0);
433   }
434
435 do_final:
436   LOG (GNUNET_ERROR_TYPE_DEBUG,
437        "Treating result as final status\n");
438   GNUNET_MQ_assoc_remove (set->mq,
439                           ntohl (msg->request_id));
440   GNUNET_CONTAINER_DLL_remove (set->ops_head,
441                                set->ops_tail,
442                                oh);
443   /* Need to do this calculation _before_ the result callback,
444      as IF the application still has a valid set handle, it
445      may trigger destruction of the set during the callback. */
446   destroy_set = (GNUNET_YES == set->destroy_requested) &&
447                 (NULL == set->ops_head);
448   if (NULL != oh->result_cb)
449   {
450     oh->result_cb (oh->result_cls,
451                    NULL,
452                    GNUNET_ntohll (msg->current_size),
453                    result_status);
454   }
455   else
456   {
457     LOG (GNUNET_ERROR_TYPE_DEBUG,
458          "No callback for final status\n");
459   }
460   if (destroy_set)
461     GNUNET_SET_destroy (set);
462   GNUNET_free (oh);
463   return;
464
465 do_element:
466   LOG (GNUNET_ERROR_TYPE_DEBUG,
467        "Treating result as element\n");
468   e.data = &msg[1];
469   e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
470   e.element_type = ntohs (msg->element_type);
471   if (NULL != oh->result_cb)
472     oh->result_cb (oh->result_cls,
473                    &e,
474                    GNUNET_ntohll (msg->current_size),
475                    result_status);
476 }
477
478
479 /**
480  * Destroy the given set operation.
481  *
482  * @param oh set operation to destroy
483  */
484 static void
485 set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
486 {
487   struct GNUNET_SET_Handle *set = oh->set;
488   struct GNUNET_SET_OperationHandle *h_assoc;
489
490   if (NULL != oh->conclude_mqm)
491     GNUNET_MQ_discard (oh->conclude_mqm);
492   /* is the operation already commited? */
493   if (NULL != set)
494   {
495     GNUNET_CONTAINER_DLL_remove (set->ops_head,
496                                  set->ops_tail,
497                                  oh);
498     h_assoc = GNUNET_MQ_assoc_remove (set->mq,
499                                       oh->request_id);
500     GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
501   }
502   GNUNET_free (oh);
503 }
504
505
506 /**
507  * Cancel the given set operation.  We need to send an explicit cancel
508  * message, as all operations one one set communicate using one
509  * handle.
510  *
511  * @param oh set operation to cancel
512  */
513 void
514 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
515 {
516   struct GNUNET_SET_Handle *set = oh->set;
517   struct GNUNET_SET_CancelMessage *m;
518   struct GNUNET_MQ_Envelope *mqm;
519
520   LOG (GNUNET_ERROR_TYPE_DEBUG,
521        "Cancelling SET operation\n");
522   if (NULL != set)
523   {
524     mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
525     m->request_id = htonl (oh->request_id);
526     GNUNET_MQ_send (set->mq, mqm);
527   }
528   set_operation_destroy (oh);
529   if ( (NULL != set) &&
530        (GNUNET_YES == set->destroy_requested) &&
531        (NULL == set->ops_head) )
532   {
533     LOG (GNUNET_ERROR_TYPE_DEBUG,
534          "Destroying set after operation cancel\n");
535     GNUNET_SET_destroy (set);
536   }
537 }
538
539
540 /**
541  * We encountered an error communicating with the set service while
542  * performing a set operation. Report to the application.
543  *
544  * @param cls the `struct GNUNET_SET_Handle`
545  * @param error error code
546  */
547 static void
548 handle_client_set_error (void *cls,
549                          enum GNUNET_MQ_Error error)
550 {
551   struct GNUNET_SET_Handle *set = cls;
552   GNUNET_SET_ElementIterator iter = set->iterator;
553
554   LOG (GNUNET_ERROR_TYPE_ERROR,
555        "Handling client set error %d\n",
556        error);
557   while (NULL != set->ops_head)
558   {
559     if (NULL != set->ops_head->result_cb)
560       set->ops_head->result_cb (set->ops_head->result_cls,
561                                 NULL,
562                                 0,
563                                 GNUNET_SET_STATUS_FAILURE);
564     set_operation_destroy (set->ops_head);
565   }
566   set->iterator = NULL;
567   set->iteration_id++;
568   set->invalid = GNUNET_YES;
569   if (NULL != iter)
570     iter (set->iterator_cls,
571           NULL);
572 }
573
574
575 /**
576  * FIXME.
577  */
578 static struct GNUNET_SET_Handle *
579 create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
580                  enum GNUNET_SET_OperationType op,
581                  const uint32_t *cookie)
582 {
583   struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
584   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
585     GNUNET_MQ_hd_var_size (result,
586                            GNUNET_MESSAGE_TYPE_SET_RESULT,
587                            struct GNUNET_SET_ResultMessage,
588                            set),
589     GNUNET_MQ_hd_var_size (iter_element,
590                            GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
591                            struct GNUNET_SET_IterResponseMessage,
592                            set),
593     GNUNET_MQ_hd_fixed_size (iter_done,
594                              GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
595                              struct GNUNET_MessageHeader,
596                              set),
597     GNUNET_MQ_hd_fixed_size (copy_lazy,
598                              GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
599                              struct GNUNET_SET_CopyLazyResponseMessage,
600                              set),
601     GNUNET_MQ_handler_end ()
602   };
603   struct GNUNET_MQ_Envelope *mqm;
604   struct GNUNET_SET_CreateMessage *create_msg;
605   struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
606
607   set->cfg = cfg;
608   set->mq = GNUNET_CLIENT_connect (cfg,
609                                    "set",
610                                    mq_handlers,
611                                    &handle_client_set_error,
612                                    set);
613   if (NULL == set->mq)
614   {
615     GNUNET_free (set);
616     return NULL;
617   }
618   if (NULL == cookie)
619   {
620     LOG (GNUNET_ERROR_TYPE_DEBUG,
621          "Creating new set (operation %u)\n",
622          op);
623     mqm = GNUNET_MQ_msg (create_msg,
624                          GNUNET_MESSAGE_TYPE_SET_CREATE);
625     create_msg->operation = htonl (op);
626   }
627   else
628   {
629     LOG (GNUNET_ERROR_TYPE_DEBUG,
630          "Creating new set (lazy copy)\n",
631          op);
632     mqm = GNUNET_MQ_msg (copy_msg,
633                          GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
634     copy_msg->cookie = *cookie;
635   }
636   GNUNET_MQ_send (set->mq,
637                   mqm);
638   return set;
639 }
640
641
642 /**
643  * Create an empty set, supporting the specified operation.
644  *
645  * @param cfg configuration to use for connecting to the
646  *        set service
647  * @param op operation supported by the set
648  *        Note that the operation has to be specified
649  *        beforehand, as certain set operations need to maintain
650  *        data structures spefific to the operation
651  * @return a handle to the set
652  */
653 struct GNUNET_SET_Handle *
654 GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
655                    enum GNUNET_SET_OperationType op)
656 {
657   struct GNUNET_SET_Handle *set;
658
659   set = create_internal (cfg,
660                           op,
661                           NULL);
662   LOG (GNUNET_ERROR_TYPE_DEBUG,
663        "Creating set %p for operation %d\n",
664        set,
665        op);
666   return set;
667 }
668
669
670 /**
671  * Add an element to the given set.  After the element has been added
672  * (in the sense of being transmitted to the set service), @a cont
673  * will be called.  Multiple calls to GNUNET_SET_add_element() can be
674  * queued.
675  *
676  * @param set set to add element to
677  * @param element element to add to the set
678  * @param cont continuation called after the element has been added
679  * @param cont_cls closure for @a cont
680  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
681  *         set is invalid (e.g. the set service crashed)
682  */
683 int
684 GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
685                         const struct GNUNET_SET_Element *element,
686                         GNUNET_SET_Continuation cont,
687                         void *cont_cls)
688 {
689   struct GNUNET_MQ_Envelope *mqm;
690   struct GNUNET_SET_ElementMessage *msg;
691
692   LOG (GNUNET_ERROR_TYPE_INFO,
693        "adding element of type %u to set %p\n",
694        (unsigned int) element->element_type,
695        set);
696   if (GNUNET_YES == set->invalid)
697   {
698     if (NULL != cont)
699       cont (cont_cls);
700     return GNUNET_SYSERR;
701   }
702   mqm = GNUNET_MQ_msg_extra (msg,
703                              element->size,
704                              GNUNET_MESSAGE_TYPE_SET_ADD);
705   msg->element_type = htons (element->element_type);
706   GNUNET_memcpy (&msg[1],
707                  element->data,
708                  element->size);
709   GNUNET_MQ_notify_sent (mqm,
710                          cont, cont_cls);
711   GNUNET_MQ_send (set->mq, mqm);
712   return GNUNET_OK;
713 }
714
715
716 /**
717  * Remove an element to the given set.  After the element has been
718  * removed (in the sense of the request being transmitted to the set
719  * service), @a cont will be called.  Multiple calls to
720  * GNUNET_SET_remove_element() can be queued
721  *
722  * @param set set to remove element from
723  * @param element element to remove from the set
724  * @param cont continuation called after the element has been removed
725  * @param cont_cls closure for @a cont
726  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
727  *         set is invalid (e.g. the set service crashed)
728  */
729 int
730 GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
731                            const struct GNUNET_SET_Element *element,
732                            GNUNET_SET_Continuation cont,
733                            void *cont_cls)
734 {
735   struct GNUNET_MQ_Envelope *mqm;
736   struct GNUNET_SET_ElementMessage *msg;
737
738   LOG (GNUNET_ERROR_TYPE_DEBUG,
739        "Removing element from set %p\n",
740        set);
741   if (GNUNET_YES == set->invalid)
742   {
743     if (NULL != cont)
744       cont (cont_cls);
745     return GNUNET_SYSERR;
746   }
747   mqm = GNUNET_MQ_msg_extra (msg,
748                              element->size,
749                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
750   msg->element_type = htons (element->element_type);
751   GNUNET_memcpy (&msg[1],
752                  element->data,
753                  element->size);
754   GNUNET_MQ_notify_sent (mqm,
755                          cont, cont_cls);
756   GNUNET_MQ_send (set->mq, mqm);
757   return GNUNET_OK;
758 }
759
760
761 /**
762  * Destroy the set handle if no operations are left, mark the set
763  * for destruction otherwise.
764  *
765  * @param set set handle to destroy
766  */
767 void
768 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
769 {
770   /* destroying set while iterator is active is currently
771      not supported; we should expand the API to allow
772      clients to explicitly cancel the iteration! */
773   if ( (NULL != set->ops_head) ||
774        (NULL != set->iterator) ||
775        (GNUNET_SYSERR == set->destroy_requested) )
776   {
777     LOG (GNUNET_ERROR_TYPE_DEBUG,
778          "Set operations are pending, delaying set destruction\n");
779     set->destroy_requested = GNUNET_YES;
780     return;
781   }
782   LOG (GNUNET_ERROR_TYPE_DEBUG,
783        "Really destroying set\n");
784   if (NULL != set->mq)
785   {
786     GNUNET_MQ_destroy (set->mq);
787     set->mq = NULL;
788   }
789   GNUNET_free (set);
790 }
791
792
793 /**
794  * Prepare a set operation to be evaluated with another peer.
795  * The evaluation will not start until the client provides
796  * a local set with #GNUNET_SET_commit().
797  *
798  * @param other_peer peer with the other set
799  * @param app_id hash for the application using the set
800  * @param context_msg additional information for the request
801  * @param result_mode specified how results will be returned,
802  *        see `enum GNUNET_SET_ResultMode`.
803  * @param result_cb called on error or success
804  * @param result_cls closure for @e result_cb
805  * @return a handle to cancel the operation
806  */
807 struct GNUNET_SET_OperationHandle *
808 GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
809                     const struct GNUNET_HashCode *app_id,
810                     const struct GNUNET_MessageHeader *context_msg,
811                     enum GNUNET_SET_ResultMode result_mode,
812                     struct GNUNET_SET_Option options[],
813                     GNUNET_SET_ResultIterator result_cb,
814                     void *result_cls)
815 {
816   struct GNUNET_MQ_Envelope *mqm;
817   struct GNUNET_SET_OperationHandle *oh;
818   struct GNUNET_SET_EvaluateMessage *msg;
819   struct GNUNET_SET_Option *opt;
820
821   LOG (GNUNET_ERROR_TYPE_DEBUG,
822        "Client prepares set operation (%d)\n",
823        result_mode);
824   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
825   oh->result_cb = result_cb;
826   oh->result_cls = result_cls;
827   mqm = GNUNET_MQ_msg_nested_mh (msg,
828                                  GNUNET_MESSAGE_TYPE_SET_EVALUATE,
829                                  context_msg);
830   msg->app_id = *app_id;
831   msg->result_mode = htonl (result_mode);
832   msg->target_peer = *other_peer;
833   for (opt = options; opt->type != 0; opt++)
834   {
835     switch (opt->type)
836     {
837       case GNUNET_SET_OPTION_BYZANTINE:
838         msg->byzantine = GNUNET_YES;
839         msg->byzantine_lower_bound = opt->v.num;
840         break;
841       case GNUNET_SET_OPTION_FORCE_FULL:
842         msg->force_full = GNUNET_YES;
843         break;
844       case GNUNET_SET_OPTION_FORCE_DELTA:
845         msg->force_delta = GNUNET_YES;
846         break;
847       default:
848         LOG (GNUNET_ERROR_TYPE_ERROR,
849              "Option with type %d not recognized\n", (int) opt->type);
850     }
851   }
852   oh->conclude_mqm = mqm;
853   oh->request_id_addr = &msg->request_id;
854
855   return oh;
856 }
857
858
859 /**
860  * Connect to the set service in order to listen for requests.
861  *
862  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
863  */
864 static void
865 listen_connect (void *cls);
866
867
868 /**
869  * Check validity of request message for a listen operation
870  *
871  * @param cls the listen handle
872  * @param msg the message
873  * @return #GNUNET_OK if the message is well-formed
874  */
875 static int
876 check_request (void *cls,
877                const struct GNUNET_SET_RequestMessage *msg)
878 {
879   const struct GNUNET_MessageHeader *context_msg;
880
881   if (ntohs (msg->header.size) == sizeof (*msg))
882     return GNUNET_OK; /* no context message is OK */
883   context_msg = GNUNET_MQ_extract_nested_mh (msg);
884   if (NULL == context_msg)
885   {
886     /* malformed context message is NOT ok */
887     GNUNET_break_op (0);
888     return GNUNET_SYSERR;
889   }
890   return GNUNET_OK;
891 }
892
893
894 /**
895  * Handle request message for a listen operation
896  *
897  * @param cls the listen handle
898  * @param msg the message
899  */
900 static void
901 handle_request (void *cls,
902                 const struct GNUNET_SET_RequestMessage *msg)
903 {
904   struct GNUNET_SET_ListenHandle *lh = cls;
905   struct GNUNET_SET_Request req;
906   const struct GNUNET_MessageHeader *context_msg;
907   struct GNUNET_MQ_Envelope *mqm;
908   struct GNUNET_SET_RejectMessage *rmsg;
909
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911        "Processing incoming operation request with id %u\n",
912        ntohl (msg->accept_id));
913   /* we got another valid request => reset the backoff */
914   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
915   req.accept_id = ntohl (msg->accept_id);
916   req.accepted = GNUNET_NO;
917   context_msg = GNUNET_MQ_extract_nested_mh (msg);
918   /* calling #GNUNET_SET_accept() in the listen cb will set req->accepted */
919   lh->listen_cb (lh->listen_cls,
920                  &msg->peer_id,
921                  context_msg,
922                  &req);
923   if (GNUNET_YES == req.accepted)
924     return; /* the accept-case is handled in #GNUNET_SET_accept() */
925   LOG (GNUNET_ERROR_TYPE_DEBUG,
926        "Rejected request %u\n",
927        ntohl (msg->accept_id));
928   mqm = GNUNET_MQ_msg (rmsg,
929                        GNUNET_MESSAGE_TYPE_SET_REJECT);
930   rmsg->accept_reject_id = msg->accept_id;
931   GNUNET_MQ_send (lh->mq, mqm);
932 }
933
934
935 /**
936  * Our connection with the set service encountered an error,
937  * re-initialize with exponential back-off.
938  *
939  * @param cls the `struct GNUNET_SET_ListenHandle *`
940  * @param error reason for the disconnect
941  */
942 static void
943 handle_client_listener_error (void *cls,
944                               enum GNUNET_MQ_Error error)
945 {
946   struct GNUNET_SET_ListenHandle *lh = cls;
947
948   LOG (GNUNET_ERROR_TYPE_DEBUG,
949        "Listener broke down (%d), re-connecting\n",
950        (int) error);
951   GNUNET_MQ_destroy (lh->mq);
952   lh->mq = NULL;
953   lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
954                                                      &listen_connect,
955                                                      lh);
956   lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
957 }
958
959
960 /**
961  * Connect to the set service in order to listen for requests.
962  *
963  * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
964  */
965 static void
966 listen_connect (void *cls)
967 {
968   struct GNUNET_SET_ListenHandle *lh = cls;
969   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
970     GNUNET_MQ_hd_var_size (request,
971                            GNUNET_MESSAGE_TYPE_SET_REQUEST,
972                            struct GNUNET_SET_RequestMessage,
973                            lh),
974     GNUNET_MQ_handler_end ()
975   };
976   struct GNUNET_MQ_Envelope *mqm;
977   struct GNUNET_SET_ListenMessage *msg;
978
979   lh->reconnect_task = NULL;
980   GNUNET_assert (NULL == lh->mq);
981   lh->mq = GNUNET_CLIENT_connect (lh->cfg,
982                                   "set",
983                                   mq_handlers,
984                                   &handle_client_listener_error,
985                                   lh);
986   if (NULL == lh->mq)
987     return;
988   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
989   msg->operation = htonl (lh->operation);
990   msg->app_id = lh->app_id;
991   GNUNET_MQ_send (lh->mq,
992                   mqm);
993 }
994
995
996 /**
997  * Wait for set operation requests for the given application id
998  *
999  * @param cfg configuration to use for connecting to
1000  *            the set service, needs to be valid for the lifetime of the listen handle
1001  * @param operation operation we want to listen for
1002  * @param app_id id of the application that handles set operation requests
1003  * @param listen_cb called for each incoming request matching the operation
1004  *                  and application id
1005  * @param listen_cls handle for @a listen_cb
1006  * @return a handle that can be used to cancel the listen operation
1007  */
1008 struct GNUNET_SET_ListenHandle *
1009 GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1010                    enum GNUNET_SET_OperationType operation,
1011                    const struct GNUNET_HashCode *app_id,
1012                    GNUNET_SET_ListenCallback listen_cb,
1013                    void *listen_cls)
1014 {
1015   struct GNUNET_SET_ListenHandle *lh;
1016
1017   LOG (GNUNET_ERROR_TYPE_DEBUG,
1018        "Starting listener for app %s\n",
1019        GNUNET_h2s (app_id));
1020   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
1021   lh->listen_cb = listen_cb;
1022   lh->listen_cls = listen_cls;
1023   lh->cfg = cfg;
1024   lh->operation = operation;
1025   lh->app_id = *app_id;
1026   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1027   listen_connect (lh);
1028   if (NULL == lh->mq)
1029   {
1030     GNUNET_free (lh);
1031     return NULL;
1032   }
1033   return lh;
1034 }
1035
1036
1037 /**
1038  * Cancel the given listen operation.
1039  *
1040  * @param lh handle for the listen operation
1041  */
1042 void
1043 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
1044 {
1045   LOG (GNUNET_ERROR_TYPE_DEBUG,
1046        "Canceling listener %s\n",
1047        GNUNET_h2s (&lh->app_id));
1048   if (NULL != lh->mq)
1049   {
1050     GNUNET_MQ_destroy (lh->mq);
1051     lh->mq = NULL;
1052   }
1053   if (NULL != lh->reconnect_task)
1054   {
1055     GNUNET_SCHEDULER_cancel (lh->reconnect_task);
1056     lh->reconnect_task = NULL;
1057   }
1058   GNUNET_free (lh);
1059 }
1060
1061
1062 /**
1063  * Accept a request we got via #GNUNET_SET_listen.  Must be called during
1064  * #GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
1065  * afterwards.
1066  * Call #GNUNET_SET_commit to provide the local set to use for the operation,
1067  * and to begin the exchange with the remote peer.
1068  *
1069  * @param request request to accept
1070  * @param result_mode specified how results will be returned,
1071  *        see `enum GNUNET_SET_ResultMode`.
1072  * @param result_cb callback for the results
1073  * @param result_cls closure for @a result_cb
1074  * @return a handle to cancel the operation
1075  */
1076 struct GNUNET_SET_OperationHandle *
1077 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
1078                    enum GNUNET_SET_ResultMode result_mode,
1079                    struct GNUNET_SET_Option options[],
1080                    GNUNET_SET_ResultIterator result_cb,
1081                    void *result_cls)
1082 {
1083   struct GNUNET_MQ_Envelope *mqm;
1084   struct GNUNET_SET_OperationHandle *oh;
1085   struct GNUNET_SET_AcceptMessage *msg;
1086
1087   GNUNET_assert (GNUNET_NO == request->accepted);
1088   LOG (GNUNET_ERROR_TYPE_DEBUG,
1089        "Client accepts set operation (%d) with id %u\n",
1090        result_mode,
1091        request->accept_id);
1092   request->accepted = GNUNET_YES;
1093   mqm = GNUNET_MQ_msg (msg,
1094                        GNUNET_MESSAGE_TYPE_SET_ACCEPT);
1095   msg->accept_reject_id = htonl (request->accept_id);
1096   msg->result_mode = htonl (result_mode);
1097   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
1098   oh->result_cb = result_cb;
1099   oh->result_cls = result_cls;
1100   oh->conclude_mqm = mqm;
1101   oh->request_id_addr = &msg->request_id;
1102   return oh;
1103 }
1104
1105
1106 /**
1107  * Commit a set to be used with a set operation.
1108  * This function is called once we have fully constructed
1109  * the set that we want to use for the operation.  At this
1110  * time, the P2P protocol can then begin to exchange the
1111  * set information and call the result callback with the
1112  * result information.
1113  *
1114  * @param oh handle to the set operation
1115  * @param set the set to use for the operation
1116  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
1117  *         set is invalid (e.g. the set service crashed)
1118  */
1119 int
1120 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
1121                    struct GNUNET_SET_Handle *set)
1122 {
1123   if (NULL != oh->set)
1124   {
1125     /* Some other set was already commited for this
1126      * operation, there is a logic bug in the client of this API */
1127     GNUNET_break (0);
1128     return GNUNET_OK;
1129   }
1130   if (GNUNET_YES == set->invalid)
1131     return GNUNET_SYSERR;
1132   LOG (GNUNET_ERROR_TYPE_DEBUG,
1133        "Client commits to SET\n");
1134   GNUNET_assert (NULL != oh->conclude_mqm);
1135   oh->set = set;
1136   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1137                                set->ops_tail,
1138                                oh);
1139   oh->request_id = GNUNET_MQ_assoc_add (set->mq,
1140                                         oh);
1141   *oh->request_id_addr = htonl (oh->request_id);
1142   GNUNET_MQ_send (set->mq,
1143                   oh->conclude_mqm);
1144   oh->conclude_mqm = NULL;
1145   oh->request_id_addr = NULL;
1146   return GNUNET_OK;
1147 }
1148
1149
1150 /**
1151  * Iterate over all elements in the given set.  Note that this
1152  * operation involves transferring every element of the set from the
1153  * service to the client, and is thus costly.
1154  *
1155  * @param set the set to iterate over
1156  * @param iter the iterator to call for each element
1157  * @param iter_cls closure for @a iter
1158  * @return #GNUNET_YES if the iteration started successfuly,
1159  *         #GNUNET_NO if another iteration is active
1160  *         #GNUNET_SYSERR if the set is invalid (e.g. the server crashed, disconnected)
1161  */
1162 int
1163 GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
1164                     GNUNET_SET_ElementIterator iter,
1165                     void *iter_cls)
1166 {
1167   struct GNUNET_MQ_Envelope *ev;
1168
1169   GNUNET_assert (NULL != iter);
1170   if (GNUNET_YES == set->invalid)
1171     return GNUNET_SYSERR;
1172   if (NULL != set->iterator)
1173     return GNUNET_NO;
1174   LOG (GNUNET_ERROR_TYPE_DEBUG,
1175        "Iterating over set\n");
1176   set->iterator = iter;
1177   set->iterator_cls = iter_cls;
1178   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);
1179   GNUNET_MQ_send (set->mq, ev);
1180   return GNUNET_YES;
1181 }
1182
1183
1184 void
1185 GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set,
1186                       GNUNET_SET_CopyReadyCallback cb,
1187                       void *cls)
1188 {
1189   struct GNUNET_MQ_Envelope *ev;
1190   struct SetCopyRequest *req;
1191
1192   LOG (GNUNET_ERROR_TYPE_DEBUG,
1193        "Creating lazy copy of set\n");
1194   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
1195   GNUNET_MQ_send (set->mq, ev);
1196
1197   req = GNUNET_new (struct SetCopyRequest);
1198   req->cb = cb;
1199   req->cls = cls;
1200   GNUNET_CONTAINER_DLL_insert (set->copy_req_head,
1201                                set->copy_req_tail,
1202                                req);
1203 }
1204
1205
1206 /**
1207  * Create a copy of an element.  The copy
1208  * must be GNUNET_free-d by the caller.
1209  *
1210  * @param element the element to copy
1211  * @return the copied element
1212  */
1213 struct GNUNET_SET_Element *
1214 GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
1215 {
1216   struct GNUNET_SET_Element *copy;
1217
1218   copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
1219   copy->size = element->size;
1220   copy->element_type = element->element_type;
1221   copy->data = &copy[1];
1222   GNUNET_memcpy (&copy[1],
1223                  element->data,
1224                  copy->size);
1225   return copy;
1226 }
1227
1228
1229 /**
1230  * Hash a set element.
1231  *
1232  * @param element the element that should be hashed
1233  * @param[out] ret_hash a pointer to where the hash of @a element
1234  *        should be stored
1235  */
1236 void
1237 GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
1238                          struct GNUNET_HashCode *ret_hash)
1239 {
1240   struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
1241
1242   /* It's not guaranteed that the element data is always after the element header,
1243      so we need to hash the chunks separately. */
1244   GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
1245   GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
1246   GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
1247   GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
1248 }
1249
1250 /* end of set_api.c */