add attestation API
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
31
32 /**
33  * How long do we hold on to an incoming channel if there is
34  * no local listener before giving up?
35  */
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38
39 /**
40  * Lazy copy requests made by a client.
41  */
42 struct LazyCopyRequest
43 {
44   /**
45    * Kept in a DLL.
46    */
47   struct LazyCopyRequest *prev;
48
49   /**
50    * Kept in a DLL.
51    */
52   struct LazyCopyRequest *next;
53
54   /**
55    * Which set are we supposed to copy?
56    */
57   struct Set *source_set;
58
59   /**
60    * Cookie identifying the request.
61    */
62   uint32_t cookie;
63 };
64
65
66 /**
67  * A listener is inhabited by a client, and waits for evaluation
68  * requests from remote peers.
69  */
70 struct Listener
71 {
72   /**
73    * Listeners are held in a doubly linked list.
74    */
75   struct Listener *next;
76
77   /**
78    * Listeners are held in a doubly linked list.
79    */
80   struct Listener *prev;
81
82   /**
83    * Head of DLL of operations this listener is responsible for.
84    * Once the client has accepted/declined the operation, the
85    * operation is moved to the respective set's operation DLLS.
86    */
87   struct Operation *op_head;
88
89   /**
90    * Tail of DLL of operations this listener is responsible for.
91    * Once the client has accepted/declined the operation, the
92    * operation is moved to the respective set's operation DLLS.
93    */
94   struct Operation *op_tail;
95
96   /**
97    * Client that owns the listener.
98    * Only one client may own a listener.
99    */
100   struct ClientState *cs;
101
102   /**
103    * The port we are listening on with CADET.
104    */
105   struct GNUNET_CADET_Port *open_port;
106
107   /**
108    * Application ID for the operation, used to distinguish
109    * multiple operations of the same type with the same peer.
110    */
111   struct GNUNET_HashCode app_id;
112
113   /**
114    * The type of the operation.
115    */
116   enum GNUNET_SET_OperationType operation;
117 };
118
119
120 /**
121  * Handle to the cadet service, used to listen for and connect to
122  * remote peers.
123  */
124 static struct GNUNET_CADET_Handle *cadet;
125
126 /**
127  * DLL of lazy copy requests by this client.
128  */
129 static struct LazyCopyRequest *lazy_copy_head;
130
131 /**
132  * DLL of lazy copy requests by this client.
133  */
134 static struct LazyCopyRequest *lazy_copy_tail;
135
136 /**
137  * Generator for unique cookie we set per lazy copy request.
138  */
139 static uint32_t lazy_copy_cookie;
140
141 /**
142  * Statistics handle.
143  */
144 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
145
146 /**
147  * Listeners are held in a doubly linked list.
148  */
149 static struct Listener *listener_head;
150
151 /**
152  * Listeners are held in a doubly linked list.
153  */
154 static struct Listener *listener_tail;
155
156 /**
157  * Number of active clients.
158  */
159 static unsigned int num_clients;
160
161 /**
162  * Are we in shutdown? if #GNUNET_YES and the number of clients
163  * drops to zero, disconnect from CADET.
164  */
165 static int in_shutdown;
166
167 /**
168  * Counter for allocating unique IDs for clients, used to identify
169  * incoming operation requests from remote peers, that the client can
170  * choose to accept or refuse.  0 must not be used (reserved for
171  * uninitialized).
172  */
173 static uint32_t suggest_id;
174
175
176 /**
177  * Get the incoming socket associated with the given id.
178  *
179  * @param listener the listener to look in
180  * @param id id to look for
181  * @return the incoming socket associated with the id,
182  *         or NULL if there is none
183  */
184 static struct Operation *
185 get_incoming (uint32_t id)
186 {
187   for (struct Listener *listener = listener_head; NULL != listener;
188        listener = listener->next)
189   {
190     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191       if (op->suggest_id == id)
192         return op;
193   }
194   return NULL;
195 }
196
197
198 /**
199  * Destroy an incoming request from a remote peer
200  *
201  * @param op remote request to destroy
202  */
203 static void
204 incoming_destroy (struct Operation *op)
205 {
206   struct Listener *listener;
207
208   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
209               "Destroying incoming operation %p\n",
210               op);
211   if (NULL != (listener = op->listener))
212   {
213     GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
214     op->listener = NULL;
215   }
216   if (NULL != op->timeout_task)
217   {
218     GNUNET_SCHEDULER_cancel (op->timeout_task);
219     op->timeout_task = NULL;
220   }
221   _GSS_operation_destroy2 (op);
222 }
223
224
225 /**
226  * Context for the #garbage_collect_cb().
227  */
228 struct GarbageContext
229 {
230   /**
231    * Map for which we are garbage collecting removed elements.
232    */
233   struct GNUNET_CONTAINER_MultiHashMap *map;
234
235   /**
236    * Lowest generation for which an operation is still pending.
237    */
238   unsigned int min_op_generation;
239
240   /**
241    * Largest generation for which an operation is still pending.
242    */
243   unsigned int max_op_generation;
244 };
245
246
247 /**
248  * Function invoked to check if an element can be removed from
249  * the set's history because it is no longer needed.
250  *
251  * @param cls the `struct GarbageContext *`
252  * @param key key of the element in the map
253  * @param value the `struct ElementEntry *`
254  * @return #GNUNET_OK (continue to iterate)
255  */
256 static int
257 garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
258 {
259   // struct GarbageContext *gc = cls;
260   // struct ElementEntry *ee = value;
261
262   // if (GNUNET_YES != ee->removed)
263   //  return GNUNET_OK;
264   // if ( (gc->max_op_generation < ee->generation_added) ||
265   //     (ee->generation_removed > gc->min_op_generation) )
266   // {
267   //  GNUNET_assert (GNUNET_YES ==
268   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
269   //                                                       key,
270   //                                                       ee));
271   //  GNUNET_free (ee);
272   // }
273   return GNUNET_OK;
274 }
275
276
277 /**
278  * Collect and destroy elements that are not needed anymore, because
279  * their lifetime (as determined by their generation) does not overlap
280  * with any active set operation.
281  *
282  * @param set set to garbage collect
283  */
284 static void
285 collect_generation_garbage (struct Set *set)
286 {
287   struct GarbageContext gc;
288
289   gc.min_op_generation = UINT_MAX;
290   gc.max_op_generation = 0;
291   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
292   {
293     gc.min_op_generation =
294       GNUNET_MIN (gc.min_op_generation, op->generation_created);
295     gc.max_op_generation =
296       GNUNET_MAX (gc.max_op_generation, op->generation_created);
297   }
298   gc.map = set->content->elements;
299   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
300                                          &garbage_collect_cb,
301                                          &gc);
302 }
303
304
305 /**
306  * Is @a generation in the range of exclusions?
307  *
308  * @param generation generation to query
309  * @param excluded array of generations where the element is excluded
310  * @param excluded_size length of the @a excluded array
311  * @return #GNUNET_YES if @a generation is in any of the ranges
312  */
313 static int
314 is_excluded_generation (unsigned int generation,
315                         struct GenerationRange *excluded,
316                         unsigned int excluded_size)
317 {
318   for (unsigned int i = 0; i < excluded_size; i++)
319     if ((generation >= excluded[i].start) && (generation < excluded[i].end))
320       return GNUNET_YES;
321   return GNUNET_NO;
322 }
323
324
325 /**
326  * Is element @a ee part of the set during @a query_generation?
327  *
328  * @param ee element to test
329  * @param query_generation generation to query
330  * @param excluded array of generations where the element is excluded
331  * @param excluded_size length of the @a excluded array
332  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
333  */
334 static int
335 is_element_of_generation (struct ElementEntry *ee,
336                           unsigned int query_generation,
337                           struct GenerationRange *excluded,
338                           unsigned int excluded_size)
339 {
340   struct MutationEvent *mut;
341   int is_present;
342
343   GNUNET_assert (NULL != ee->mutations);
344   if (GNUNET_YES ==
345       is_excluded_generation (query_generation, excluded, excluded_size))
346   {
347     GNUNET_break (0);
348     return GNUNET_NO;
349   }
350
351   is_present = GNUNET_NO;
352
353   /* Could be made faster with binary search, but lists
354      are small, so why bother. */
355   for (unsigned int i = 0; i < ee->mutations_size; i++)
356   {
357     mut = &ee->mutations[i];
358
359     if (mut->generation > query_generation)
360     {
361       /* The mutation doesn't apply to our generation
362          anymore.  We can'b break here, since mutations aren't
363          sorted by generation. */
364       continue;
365     }
366
367     if (GNUNET_YES ==
368         is_excluded_generation (mut->generation, excluded, excluded_size))
369     {
370       /* The generation is excluded (because it belongs to another
371          fork via a lazy copy) and thus mutations aren't considered
372          for membership testing. */
373       continue;
374     }
375
376     /* This would be an inconsistency in how we manage mutations. */
377     if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
378       GNUNET_assert (0);
379     /* Likewise. */
380     if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
381       GNUNET_assert (0);
382
383     is_present = mut->added;
384   }
385
386   return is_present;
387 }
388
389
390 /**
391  * Is element @a ee part of the set used by @a op?
392  *
393  * @param ee element to test
394  * @param op operation the defines the set and its generation
395  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
396  */
397 int
398 _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op)
399 {
400   return is_element_of_generation (ee,
401                                    op->generation_created,
402                                    op->set->excluded_generations,
403                                    op->set->excluded_generations_size);
404 }
405
406
407 /**
408  * Destroy the given operation.  Used for any operation where both
409  * peers were known and that thus actually had a vt and channel.  Must
410  * not be used for operations where 'listener' is still set and we do
411  * not know the other peer.
412  *
413  * Call the implementation-specific cancel function of the operation.
414  * Disconnects from the remote peer.  Does not disconnect the client,
415  * as there may be multiple operations per set.
416  *
417  * @param op operation to destroy
418  * @param gc #GNUNET_YES to perform garbage collection on the set
419  */
420 void
421 _GSS_operation_destroy (struct Operation *op, int gc)
422 {
423   struct Set *set = op->set;
424   struct GNUNET_CADET_Channel *channel;
425
426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
427   GNUNET_assert (NULL == op->listener);
428   if (NULL != op->state)
429   {
430     set->vt->cancel (op);
431     op->state = NULL;
432   }
433   if (NULL != set)
434   {
435     GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op);
436     op->set = NULL;
437   }
438   if (NULL != op->context_msg)
439   {
440     GNUNET_free (op->context_msg);
441     op->context_msg = NULL;
442   }
443   if (NULL != (channel = op->channel))
444   {
445     /* This will free op; called conditionally as this helper function
446        is also called from within the channel disconnect handler. */
447     op->channel = NULL;
448     GNUNET_CADET_channel_destroy (channel);
449   }
450   if ((NULL != set) && (GNUNET_YES == gc))
451     collect_generation_garbage (set);
452   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
453    * there was a channel end handler that will free 'op' on the call stack. */
454 }
455
456
457 /**
458  * Callback called when a client connects to the service.
459  *
460  * @param cls closure for the service
461  * @param c the new client that connected to the service
462  * @param mq the message queue used to send messages to the client
463  * @return @a `struct ClientState`
464  */
465 static void *
466 client_connect_cb (void *cls,
467                    struct GNUNET_SERVICE_Client *c,
468                    struct GNUNET_MQ_Handle *mq)
469 {
470   struct ClientState *cs;
471
472   num_clients++;
473   cs = GNUNET_new (struct ClientState);
474   cs->client = c;
475   cs->mq = mq;
476   return cs;
477 }
478
479
480 /**
481  * Iterator over hash map entries to free element entries.
482  *
483  * @param cls closure
484  * @param key current key code
485  * @param value a `struct ElementEntry *` to be free'd
486  * @return #GNUNET_YES (continue to iterate)
487  */
488 static int
489 destroy_elements_iterator (void *cls,
490                            const struct GNUNET_HashCode *key,
491                            void *value)
492 {
493   struct ElementEntry *ee = value;
494
495   GNUNET_free_non_null (ee->mutations);
496   GNUNET_free (ee);
497   return GNUNET_YES;
498 }
499
500
501 /**
502  * Clean up after a client has disconnected
503  *
504  * @param cls closure, unused
505  * @param client the client to clean up after
506  * @param internal_cls the `struct ClientState`
507  */
508 static void
509 client_disconnect_cb (void *cls,
510                       struct GNUNET_SERVICE_Client *client,
511                       void *internal_cls)
512 {
513   struct ClientState *cs = internal_cls;
514   struct Operation *op;
515   struct Listener *listener;
516   struct Set *set;
517
518   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
519   if (NULL != (set = cs->set))
520   {
521     struct SetContent *content = set->content;
522     struct PendingMutation *pm;
523     struct PendingMutation *pm_current;
524     struct LazyCopyRequest *lcr;
525
526     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
527     /* Destroy pending set operations */
528     while (NULL != set->ops_head)
529       _GSS_operation_destroy (set->ops_head, GNUNET_NO);
530
531     /* Destroy operation-specific state */
532     GNUNET_assert (NULL != set->state);
533     set->vt->destroy_set (set->state);
534     set->state = NULL;
535
536     /* Clean up ongoing iterations */
537     if (NULL != set->iter)
538     {
539       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
540       set->iter = NULL;
541       set->iteration_id++;
542     }
543
544     /* discard any pending mutations that reference this set */
545     pm = content->pending_mutations_head;
546     while (NULL != pm)
547     {
548       pm_current = pm;
549       pm = pm->next;
550       if (pm_current->set == set)
551       {
552         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
553                                      content->pending_mutations_tail,
554                                      pm_current);
555         GNUNET_free (pm_current);
556       }
557     }
558
559     /* free set content (or at least decrement RC) */
560     set->content = NULL;
561     GNUNET_assert (0 != content->refcount);
562     content->refcount--;
563     if (0 == content->refcount)
564     {
565       GNUNET_assert (NULL != content->elements);
566       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
567                                              &destroy_elements_iterator,
568                                              NULL);
569       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
570       content->elements = NULL;
571       GNUNET_free (content);
572     }
573     GNUNET_free_non_null (set->excluded_generations);
574     set->excluded_generations = NULL;
575
576     /* remove set from pending copy requests */
577     lcr = lazy_copy_head;
578     while (NULL != lcr)
579     {
580       struct LazyCopyRequest *lcr_current = lcr;
581
582       lcr = lcr->next;
583       if (lcr_current->source_set == set)
584       {
585         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
586                                      lazy_copy_tail,
587                                      lcr_current);
588         GNUNET_free (lcr_current);
589       }
590     }
591     GNUNET_free (set);
592   }
593
594   if (NULL != (listener = cs->listener))
595   {
596     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
597     GNUNET_CADET_close_port (listener->open_port);
598     listener->open_port = NULL;
599     while (NULL != (op = listener->op_head))
600     {
601       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
602                   "Destroying incoming operation `%u' from peer `%s'\n",
603                   (unsigned int) op->client_request_id,
604                   GNUNET_i2s (&op->peer));
605       incoming_destroy (op);
606     }
607     GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
608     GNUNET_free (listener);
609   }
610   GNUNET_free (cs);
611   num_clients--;
612   if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
613   {
614     if (NULL != cadet)
615     {
616       GNUNET_CADET_disconnect (cadet);
617       cadet = NULL;
618     }
619   }
620 }
621
622
623 /**
624  * Check a request for a set operation from another peer.
625  *
626  * @param cls the operation state
627  * @param msg the received message
628  * @return #GNUNET_OK if the channel should be kept alive,
629  *         #GNUNET_SYSERR to destroy the channel
630  */
631 static int
632 check_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
633 {
634   struct Operation *op = cls;
635   struct Listener *listener = op->listener;
636   const struct GNUNET_MessageHeader *nested_context;
637
638   /* double operation request */
639   if (0 != op->suggest_id)
640   {
641     GNUNET_break_op (0);
642     return GNUNET_SYSERR;
643   }
644   /* This should be equivalent to the previous condition, but can't hurt to check twice */
645   if (NULL == op->listener)
646   {
647     GNUNET_break (0);
648     return GNUNET_SYSERR;
649   }
650   if (listener->operation !=
651       (enum GNUNET_SET_OperationType) ntohl (msg->operation))
652   {
653     GNUNET_break_op (0);
654     return GNUNET_SYSERR;
655   }
656   nested_context = GNUNET_MQ_extract_nested_mh (msg);
657   if ((NULL != nested_context) &&
658       (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
659   {
660     GNUNET_break_op (0);
661     return GNUNET_SYSERR;
662   }
663   return GNUNET_OK;
664 }
665
666
667 /**
668  * Handle a request for a set operation from another peer.  Checks if we
669  * have a listener waiting for such a request (and in that case initiates
670  * asking the listener about accepting the connection). If no listener
671  * is waiting, we queue the operation request in hope that a listener
672  * shows up soon (before timeout).
673  *
674  * This msg is expected as the first and only msg handled through the
675  * non-operation bound virtual table, acceptance of this operation replaces
676  * our virtual table and subsequent msgs would be routed differently (as
677  * we then know what type of operation this is).
678  *
679  * @param cls the operation state
680  * @param msg the received message
681  * @return #GNUNET_OK if the channel should be kept alive,
682  *         #GNUNET_SYSERR to destroy the channel
683  */
684 static void
685 handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
686 {
687   struct Operation *op = cls;
688   struct Listener *listener = op->listener;
689   const struct GNUNET_MessageHeader *nested_context;
690   struct GNUNET_MQ_Envelope *env;
691   struct GNUNET_SET_RequestMessage *cmsg;
692
693   nested_context = GNUNET_MQ_extract_nested_mh (msg);
694   /* Make a copy of the nested_context (application-specific context
695      information that is opaque to set) so we can pass it to the
696      listener later on */
697   if (NULL != nested_context)
698     op->context_msg = GNUNET_copy_message (nested_context);
699   op->remote_element_count = ntohl (msg->element_count);
700   GNUNET_log (
701     GNUNET_ERROR_TYPE_DEBUG,
702     "Received P2P operation request (op %u, port %s) for active listener\n",
703     (uint32_t) ntohl (msg->operation),
704     GNUNET_h2s (&op->listener->app_id));
705   GNUNET_assert (0 == op->suggest_id);
706   if (0 == suggest_id)
707     suggest_id++;
708   op->suggest_id = suggest_id++;
709   GNUNET_assert (NULL != op->timeout_task);
710   GNUNET_SCHEDULER_cancel (op->timeout_task);
711   op->timeout_task = NULL;
712   env = GNUNET_MQ_msg_nested_mh (cmsg,
713                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
714                                  op->context_msg);
715   GNUNET_log (
716     GNUNET_ERROR_TYPE_DEBUG,
717     "Suggesting incoming request with accept id %u to listener %p of client %p\n",
718     op->suggest_id,
719     listener,
720     listener->cs);
721   cmsg->accept_id = htonl (op->suggest_id);
722   cmsg->peer_id = op->peer;
723   GNUNET_MQ_send (listener->cs->mq, env);
724   /* NOTE: GNUNET_CADET_receive_done() will be called in
725    #handle_client_accept() */
726 }
727
728
729 /**
730  * Add an element to @a set as specified by @a msg
731  *
732  * @param set set to manipulate
733  * @param msg message specifying the change
734  */
735 static void
736 execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
737 {
738   struct GNUNET_SET_Element el;
739   struct ElementEntry *ee;
740   struct GNUNET_HashCode hash;
741
742   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
743   el.size = ntohs (msg->header.size) - sizeof(*msg);
744   el.data = &msg[1];
745   el.element_type = ntohs (msg->element_type);
746   GNUNET_SET_element_hash (&el, &hash);
747   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
748   if (NULL == ee)
749   {
750     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751                 "Client inserts element %s of size %u\n",
752                 GNUNET_h2s (&hash),
753                 el.size);
754     ee = GNUNET_malloc (el.size + sizeof(*ee));
755     ee->element.size = el.size;
756     GNUNET_memcpy (&ee[1], el.data, el.size);
757     ee->element.data = &ee[1];
758     ee->element.element_type = el.element_type;
759     ee->remote = GNUNET_NO;
760     ee->mutations = NULL;
761     ee->mutations_size = 0;
762     ee->element_hash = hash;
763     GNUNET_break (GNUNET_YES ==
764                   GNUNET_CONTAINER_multihashmap_put (
765                     set->content->elements,
766                     &ee->element_hash,
767                     ee,
768                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
769   }
770   else if (GNUNET_YES ==
771            is_element_of_generation (ee,
772                                      set->current_generation,
773                                      set->excluded_generations,
774                                      set->excluded_generations_size))
775   {
776     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777                 "Client inserted element %s of size %u twice (ignored)\n",
778                 GNUNET_h2s (&hash),
779                 el.size);
780
781     /* same element inserted twice */
782     return;
783   }
784
785   {
786     struct MutationEvent mut = { .generation = set->current_generation,
787                                  .added = GNUNET_YES };
788     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
789   }
790   set->vt->add (set->state, ee);
791 }
792
793
794 /**
795  * Remove an element from @a set as specified by @a msg
796  *
797  * @param set set to manipulate
798  * @param msg message specifying the change
799  */
800 static void
801 execute_remove (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
802 {
803   struct GNUNET_SET_Element el;
804   struct ElementEntry *ee;
805   struct GNUNET_HashCode hash;
806
807   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
808   el.size = ntohs (msg->header.size) - sizeof(*msg);
809   el.data = &msg[1];
810   el.element_type = ntohs (msg->element_type);
811   GNUNET_SET_element_hash (&el, &hash);
812   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
813   if (NULL == ee)
814   {
815     /* Client tried to remove non-existing element. */
816     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817                 "Client removes non-existing element of size %u\n",
818                 el.size);
819     return;
820   }
821   if (GNUNET_NO == is_element_of_generation (ee,
822                                              set->current_generation,
823                                              set->excluded_generations,
824                                              set->excluded_generations_size))
825   {
826     /* Client tried to remove element twice */
827     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828                 "Client removed element of size %u twice (ignored)\n",
829                 el.size);
830     return;
831   }
832   else
833   {
834     struct MutationEvent mut = { .generation = set->current_generation,
835                                  .added = GNUNET_NO };
836
837     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838                 "Client removes element of size %u\n",
839                 el.size);
840
841     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
842   }
843   set->vt->remove (set->state, ee);
844 }
845
846
847 /**
848  * Perform a mutation on a set as specified by the @a msg
849  *
850  * @param set the set to mutate
851  * @param msg specification of what to change
852  */
853 static void
854 execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
855 {
856   switch (ntohs (msg->header.type))
857   {
858   case GNUNET_MESSAGE_TYPE_SET_ADD:
859     execute_add (set, msg);
860     break;
861
862   case GNUNET_MESSAGE_TYPE_SET_REMOVE:
863     execute_remove (set, msg);
864     break;
865
866   default:
867     GNUNET_break (0);
868   }
869 }
870
871
872 /**
873  * Execute mutations that were delayed on a set because of
874  * pending operations.
875  *
876  * @param set the set to execute mutations on
877  */
878 static void
879 execute_delayed_mutations (struct Set *set)
880 {
881   struct PendingMutation *pm;
882
883   if (0 != set->content->iterator_count)
884     return; /* still cannot do this */
885   while (NULL != (pm = set->content->pending_mutations_head))
886   {
887     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
888                                  set->content->pending_mutations_tail,
889                                  pm);
890     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891                 "Executing pending mutation on %p.\n",
892                 pm->set);
893     execute_mutation (pm->set, pm->msg);
894     GNUNET_free (pm->msg);
895     GNUNET_free (pm);
896   }
897 }
898
899
900 /**
901  * Send the next element of a set to the set's client.  The next element is given by
902  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
903  * are no more elements in the set.  The caller must ensure that the set's iterator is
904  * valid.
905  *
906  * The client will acknowledge each received element with a
907  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
908  * #handle_client_iter_ack() will then trigger the next transmission.
909  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
910  *
911  * @param set set that should send its next element to its client
912  */
913 static void
914 send_client_element (struct Set *set)
915 {
916   int ret;
917   struct ElementEntry *ee;
918   struct GNUNET_MQ_Envelope *ev;
919   struct GNUNET_SET_IterResponseMessage *msg;
920
921   GNUNET_assert (NULL != set->iter);
922   do
923   {
924     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
925                                                        NULL,
926                                                        (const void **) &ee);
927     if (GNUNET_NO == ret)
928     {
929       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
930       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
931       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
932       set->iter = NULL;
933       set->iteration_id++;
934       GNUNET_assert (set->content->iterator_count > 0);
935       set->content->iterator_count--;
936       execute_delayed_mutations (set);
937       GNUNET_MQ_send (set->cs->mq, ev);
938       return;
939     }
940     GNUNET_assert (NULL != ee);
941   }
942   while (GNUNET_NO ==
943          is_element_of_generation (ee,
944                                    set->iter_generation,
945                                    set->excluded_generations,
946                                    set->excluded_generations_size));
947   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948               "Sending iteration element on %p.\n",
949               set);
950   ev = GNUNET_MQ_msg_extra (msg,
951                             ee->element.size,
952                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
953   GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size);
954   msg->element_type = htons (ee->element.element_type);
955   msg->iteration_id = htons (set->iteration_id);
956   GNUNET_MQ_send (set->cs->mq, ev);
957 }
958
959
960 /**
961  * Called when a client wants to iterate the elements of a set.
962  * Checks if we have a set associated with the client and if we
963  * can right now start an iteration. If all checks out, starts
964  * sending the elements of the set to the client.
965  *
966  * @param cls client that sent the message
967  * @param m message sent by the client
968  */
969 static void
970 handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m)
971 {
972   struct ClientState *cs = cls;
973   struct Set *set;
974
975   if (NULL == (set = cs->set))
976   {
977     /* attempt to iterate over a non existing set */
978     GNUNET_break (0);
979     GNUNET_SERVICE_client_drop (cs->client);
980     return;
981   }
982   if (NULL != set->iter)
983   {
984     /* Only one concurrent iterate-action allowed per set */
985     GNUNET_break (0);
986     GNUNET_SERVICE_client_drop (cs->client);
987     return;
988   }
989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990               "Iterating set %p in gen %u with %u content elements\n",
991               (void *) set,
992               set->current_generation,
993               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
994   GNUNET_SERVICE_client_continue (cs->client);
995   set->content->iterator_count++;
996   set->iter =
997     GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
998   set->iter_generation = set->current_generation;
999   send_client_element (set);
1000 }
1001
1002
1003 /**
1004  * Called when a client wants to create a new set.  This is typically
1005  * the first request from a client, and includes the type of set
1006  * operation to be performed.
1007  *
1008  * @param cls client that sent the message
1009  * @param m message sent by the client
1010  */
1011 static void
1012 handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg)
1013 {
1014   struct ClientState *cs = cls;
1015   struct Set *set;
1016
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018               "Client created new set (operation %u)\n",
1019               (uint32_t) ntohl (msg->operation));
1020   if (NULL != cs->set)
1021   {
1022     /* There can only be one set per client */
1023     GNUNET_break (0);
1024     GNUNET_SERVICE_client_drop (cs->client);
1025     return;
1026   }
1027   set = GNUNET_new (struct Set);
1028   switch (ntohl (msg->operation))
1029   {
1030   case GNUNET_SET_OPERATION_INTERSECTION:
1031     set->vt = _GSS_intersection_vt ();
1032     break;
1033
1034   case GNUNET_SET_OPERATION_UNION:
1035     set->vt = _GSS_union_vt ();
1036     break;
1037
1038   default:
1039     GNUNET_free (set);
1040     GNUNET_break (0);
1041     GNUNET_SERVICE_client_drop (cs->client);
1042     return;
1043   }
1044   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1045   set->state = set->vt->create ();
1046   if (NULL == set->state)
1047   {
1048     /* initialization failed (i.e. out of memory) */
1049     GNUNET_free (set);
1050     GNUNET_SERVICE_client_drop (cs->client);
1051     return;
1052   }
1053   set->content = GNUNET_new (struct SetContent);
1054   set->content->refcount = 1;
1055   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1056   set->cs = cs;
1057   cs->set = set;
1058   GNUNET_SERVICE_client_continue (cs->client);
1059 }
1060
1061
1062 /**
1063  * Timeout happens iff:
1064  *  - we suggested an operation to our listener,
1065  *    but did not receive a response in time
1066  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1067  *
1068  * @param cls channel context
1069  * @param tc context information (why was this task triggered now)
1070  */
1071 static void
1072 incoming_timeout_cb (void *cls)
1073 {
1074   struct Operation *op = cls;
1075
1076   op->timeout_task = NULL;
1077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078               "Remote peer's incoming request timed out\n");
1079   incoming_destroy (op);
1080 }
1081
1082
1083 /**
1084  * Method called whenever another peer has added us to a channel the
1085  * other peer initiated.  Only called (once) upon reception of data
1086  * from a channel we listen on.
1087  *
1088  * The channel context represents the operation itself and gets added
1089  * to a DLL, from where it gets looked up when our local listener
1090  * client responds to a proposed/suggested operation or connects and
1091  * associates with this operation.
1092  *
1093  * @param cls closure
1094  * @param channel new handle to the channel
1095  * @param source peer that started the channel
1096  * @return initial channel context for the channel
1097  *         returns NULL on error
1098  */
1099 static void *
1100 channel_new_cb (void *cls,
1101                 struct GNUNET_CADET_Channel *channel,
1102                 const struct GNUNET_PeerIdentity *source)
1103 {
1104   struct Listener *listener = cls;
1105   struct Operation *op;
1106
1107   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1108   op = GNUNET_new (struct Operation);
1109   op->listener = listener;
1110   op->peer = *source;
1111   op->channel = channel;
1112   op->mq = GNUNET_CADET_get_mq (op->channel);
1113   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1114   op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1115                                                    &incoming_timeout_cb,
1116                                                    op);
1117   GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op);
1118   return op;
1119 }
1120
1121
1122 /**
1123  * Function called whenever a channel is destroyed.  Should clean up
1124  * any associated state.  It must NOT call
1125  * GNUNET_CADET_channel_destroy() on the channel.
1126  *
1127  * The peer_disconnect function is part of a a virtual table set initially either
1128  * when a peer creates a new channel with us, or once we create
1129  * a new channel ourselves (evaluate).
1130  *
1131  * Once we know the exact type of operation (union/intersection), the vt is
1132  * replaced with an operation specific instance (_GSS_[op]_vt).
1133  *
1134  * @param channel_ctx place where local state associated
1135  *                   with the channel is stored
1136  * @param channel connection to the other end (henceforth invalid)
1137  */
1138 static void
1139 channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1140 {
1141   struct Operation *op = channel_ctx;
1142
1143   op->channel = NULL;
1144   _GSS_operation_destroy2 (op);
1145 }
1146
1147
1148 /**
1149  * This function probably should not exist
1150  * and be replaced by inlining more specific
1151  * logic in the various places where it is called.
1152  */
1153 void
1154 _GSS_operation_destroy2 (struct Operation *op)
1155 {
1156   struct GNUNET_CADET_Channel *channel;
1157
1158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1159   if (NULL != (channel = op->channel))
1160   {
1161     /* This will free op; called conditionally as this helper function
1162        is also called from within the channel disconnect handler. */
1163     op->channel = NULL;
1164     GNUNET_CADET_channel_destroy (channel);
1165   }
1166   if (NULL != op->listener)
1167   {
1168     incoming_destroy (op);
1169     return;
1170   }
1171   if (NULL != op->set)
1172     op->set->vt->channel_death (op);
1173   else
1174     _GSS_operation_destroy (op, GNUNET_YES);
1175   GNUNET_free (op);
1176 }
1177
1178
1179 /**
1180  * Function called whenever an MQ-channel's transmission window size changes.
1181  *
1182  * The first callback in an outgoing channel will be with a non-zero value
1183  * and will mean the channel is connected to the destination.
1184  *
1185  * For an incoming channel it will be called immediately after the
1186  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1187  *
1188  * @param cls Channel closure.
1189  * @param channel Connection to the other end (henceforth invalid).
1190  * @param window_size New window size. If the is more messages than buffer size
1191  *                    this value will be negative..
1192  */
1193 static void
1194 channel_window_cb (void *cls,
1195                    const struct GNUNET_CADET_Channel *channel,
1196                    int window_size)
1197 {
1198   /* FIXME: not implemented, we could do flow control here... */
1199 }
1200
1201
1202 /**
1203  * Called when a client wants to create a new listener.
1204  *
1205  * @param cls client that sent the message
1206  * @param msg message sent by the client
1207  */
1208 static void
1209 handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg)
1210 {
1211   struct ClientState *cs = cls;
1212   struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1213   { GNUNET_MQ_hd_var_size (incoming_msg,
1214                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1215                            struct OperationRequestMessage,
1216                            NULL),
1217     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1218                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1219                            struct IBFMessage,
1220                            NULL),
1221     GNUNET_MQ_hd_var_size (union_p2p_elements,
1222                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1223                            struct GNUNET_SET_ElementMessage,
1224                            NULL),
1225     GNUNET_MQ_hd_var_size (union_p2p_offer,
1226                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1227                            struct GNUNET_MessageHeader,
1228                            NULL),
1229     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1230                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1231                            struct InquiryMessage,
1232                            NULL),
1233     GNUNET_MQ_hd_var_size (union_p2p_demand,
1234                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1235                            struct GNUNET_MessageHeader,
1236                            NULL),
1237     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1238                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1239                              struct GNUNET_MessageHeader,
1240                              NULL),
1241     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1242                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1243                              struct GNUNET_MessageHeader,
1244                              NULL),
1245     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1246                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1247                              struct GNUNET_MessageHeader,
1248                              NULL),
1249     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1250                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1251                              struct GNUNET_MessageHeader,
1252                              NULL),
1253     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1254                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1255                            struct StrataEstimatorMessage,
1256                            NULL),
1257     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1258                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1259                            struct StrataEstimatorMessage,
1260                            NULL),
1261     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1262                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1263                            struct GNUNET_SET_ElementMessage,
1264                            NULL),
1265     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1266                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1267                              struct IntersectionElementInfoMessage,
1268                              NULL),
1269     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1270                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1271                            struct BFMessage,
1272                            NULL),
1273     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1274                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1275                              struct IntersectionDoneMessage,
1276                              NULL),
1277     GNUNET_MQ_handler_end () };
1278   struct Listener *listener;
1279
1280   if (NULL != cs->listener)
1281   {
1282     /* max. one active listener per client! */
1283     GNUNET_break (0);
1284     GNUNET_SERVICE_client_drop (cs->client);
1285     return;
1286   }
1287   listener = GNUNET_new (struct Listener);
1288   listener->cs = cs;
1289   cs->listener = listener;
1290   listener->app_id = msg->app_id;
1291   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1292   GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener);
1293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294               "New listener created (op %u, port %s)\n",
1295               listener->operation,
1296               GNUNET_h2s (&listener->app_id));
1297   listener->open_port = GNUNET_CADET_open_port (cadet,
1298                                                 &msg->app_id,
1299                                                 &channel_new_cb,
1300                                                 listener,
1301                                                 &channel_window_cb,
1302                                                 &channel_end_cb,
1303                                                 cadet_handlers);
1304   GNUNET_SERVICE_client_continue (cs->client);
1305 }
1306
1307
1308 /**
1309  * Called when the listening client rejects an operation
1310  * request by another peer.
1311  *
1312  * @param cls client that sent the message
1313  * @param msg message sent by the client
1314  */
1315 static void
1316 handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg)
1317 {
1318   struct ClientState *cs = cls;
1319   struct Operation *op;
1320
1321   op = get_incoming (ntohl (msg->accept_reject_id));
1322   if (NULL == op)
1323   {
1324     /* no matching incoming operation for this reject;
1325        could be that the other peer already disconnected... */
1326     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1327                 "Client rejected unknown operation %u\n",
1328                 (unsigned int) ntohl (msg->accept_reject_id));
1329     GNUNET_SERVICE_client_continue (cs->client);
1330     return;
1331   }
1332   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1333               "Peer request (op %u, app %s) rejected by client\n",
1334               op->listener->operation,
1335               GNUNET_h2s (&cs->listener->app_id));
1336   _GSS_operation_destroy2 (op);
1337   GNUNET_SERVICE_client_continue (cs->client);
1338 }
1339
1340
1341 /**
1342  * Called when a client wants to add or remove an element to a set it inhabits.
1343  *
1344  * @param cls client that sent the message
1345  * @param msg message sent by the client
1346  */
1347 static int
1348 check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1349 {
1350   /* NOTE: Technically, we should probably check with the
1351      block library whether the element we are given is well-formed */
1352   return GNUNET_OK;
1353 }
1354
1355
1356 /**
1357  * Called when a client wants to add or remove an element to a set it inhabits.
1358  *
1359  * @param cls client that sent the message
1360  * @param msg message sent by the client
1361  */
1362 static void
1363 handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1364 {
1365   struct ClientState *cs = cls;
1366   struct Set *set;
1367
1368   if (NULL == (set = cs->set))
1369   {
1370     /* client without a set requested an operation */
1371     GNUNET_break (0);
1372     GNUNET_SERVICE_client_drop (cs->client);
1373     return;
1374   }
1375   GNUNET_SERVICE_client_continue (cs->client);
1376
1377   if (0 != set->content->iterator_count)
1378   {
1379     struct PendingMutation *pm;
1380
1381     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1382     pm = GNUNET_new (struct PendingMutation);
1383     pm->msg =
1384       (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1385     pm->set = set;
1386     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1387                                       set->content->pending_mutations_tail,
1388                                       pm);
1389     return;
1390   }
1391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1392   execute_mutation (set, msg);
1393 }
1394
1395
1396 /**
1397  * Advance the current generation of a set,
1398  * adding exclusion ranges if necessary.
1399  *
1400  * @param set the set where we want to advance the generation
1401  */
1402 static void
1403 advance_generation (struct Set *set)
1404 {
1405   struct GenerationRange r;
1406
1407   if (set->current_generation == set->content->latest_generation)
1408   {
1409     set->content->latest_generation++;
1410     set->current_generation++;
1411     return;
1412   }
1413
1414   GNUNET_assert (set->current_generation < set->content->latest_generation);
1415
1416   r.start = set->current_generation + 1;
1417   r.end = set->content->latest_generation + 1;
1418   set->content->latest_generation = r.end;
1419   set->current_generation = r.end;
1420   GNUNET_array_append (set->excluded_generations,
1421                        set->excluded_generations_size,
1422                        r);
1423 }
1424
1425
1426 /**
1427  * Called when a client wants to initiate a set operation with another
1428  * peer.  Initiates the CADET connection to the listener and sends the
1429  * request.
1430  *
1431  * @param cls client that sent the message
1432  * @param msg message sent by the client
1433  * @return #GNUNET_OK if the message is well-formed
1434  */
1435 static int
1436 check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1437 {
1438   /* FIXME: suboptimal, even if the context below could be NULL,
1439      there are malformed messages this does not check for... */
1440   return GNUNET_OK;
1441 }
1442
1443
1444 /**
1445  * Called when a client wants to initiate a set operation with another
1446  * peer.  Initiates the CADET connection to the listener and sends the
1447  * request.
1448  *
1449  * @param cls client that sent the message
1450  * @param msg message sent by the client
1451  */
1452 static void
1453 handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1454 {
1455   struct ClientState *cs = cls;
1456   struct Operation *op = GNUNET_new (struct Operation);
1457   const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1458   { GNUNET_MQ_hd_var_size (incoming_msg,
1459                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1460                            struct OperationRequestMessage,
1461                            op),
1462     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1463                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1464                            struct IBFMessage,
1465                            op),
1466     GNUNET_MQ_hd_var_size (union_p2p_elements,
1467                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1468                            struct GNUNET_SET_ElementMessage,
1469                            op),
1470     GNUNET_MQ_hd_var_size (union_p2p_offer,
1471                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1472                            struct GNUNET_MessageHeader,
1473                            op),
1474     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1475                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1476                            struct InquiryMessage,
1477                            op),
1478     GNUNET_MQ_hd_var_size (union_p2p_demand,
1479                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1480                            struct GNUNET_MessageHeader,
1481                            op),
1482     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1483                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1484                              struct GNUNET_MessageHeader,
1485                              op),
1486     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1487                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1488                              struct GNUNET_MessageHeader,
1489                              op),
1490     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1491                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1492                              struct GNUNET_MessageHeader,
1493                              op),
1494     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1495                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1496                              struct GNUNET_MessageHeader,
1497                              op),
1498     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1499                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1500                            struct StrataEstimatorMessage,
1501                            op),
1502     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1503                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1504                            struct StrataEstimatorMessage,
1505                            op),
1506     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1507                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1508                            struct GNUNET_SET_ElementMessage,
1509                            op),
1510     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1511                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1512                              struct IntersectionElementInfoMessage,
1513                              op),
1514     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1515                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1516                            struct BFMessage,
1517                            op),
1518     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1519                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1520                              struct IntersectionDoneMessage,
1521                              op),
1522     GNUNET_MQ_handler_end () };
1523   struct Set *set;
1524   const struct GNUNET_MessageHeader *context;
1525
1526   if (NULL == (set = cs->set))
1527   {
1528     GNUNET_break (0);
1529     GNUNET_free (op);
1530     GNUNET_SERVICE_client_drop (cs->client);
1531     return;
1532   }
1533   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1534   op->peer = msg->target_peer;
1535   op->result_mode = ntohl (msg->result_mode);
1536   op->client_request_id = ntohl (msg->request_id);
1537   op->byzantine = msg->byzantine;
1538   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1539   op->force_full = msg->force_full;
1540   op->force_delta = msg->force_delta;
1541   context = GNUNET_MQ_extract_nested_mh (msg);
1542
1543   /* Advance generation values, so that
1544      mutations won't interfer with the running operation. */
1545   op->set = set;
1546   op->generation_created = set->current_generation;
1547   advance_generation (set);
1548   GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550               "Creating new CADET channel to port %s for set operation type %u\n",
1551               GNUNET_h2s (&msg->app_id),
1552               set->operation);
1553   op->channel = GNUNET_CADET_channel_create (cadet,
1554                                              op,
1555                                              &msg->target_peer,
1556                                              &msg->app_id,
1557                                              &channel_window_cb,
1558                                              &channel_end_cb,
1559                                              cadet_handlers);
1560   op->mq = GNUNET_CADET_get_mq (op->channel);
1561   op->state = set->vt->evaluate (op, context);
1562   if (NULL == op->state)
1563   {
1564     GNUNET_break (0);
1565     GNUNET_SERVICE_client_drop (cs->client);
1566     return;
1567   }
1568   GNUNET_SERVICE_client_continue (cs->client);
1569 }
1570
1571
1572 /**
1573  * Handle an ack from a client, and send the next element. Note
1574  * that we only expect acks for set elements, not after the
1575  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1576  *
1577  * @param cls client the client
1578  * @param ack the message
1579  */
1580 static void
1581 handle_client_iter_ack (void *cls, const struct GNUNET_SET_IterAckMessage *ack)
1582 {
1583   struct ClientState *cs = cls;
1584   struct Set *set;
1585
1586   if (NULL == (set = cs->set))
1587   {
1588     /* client without a set acknowledged receiving a value */
1589     GNUNET_break (0);
1590     GNUNET_SERVICE_client_drop (cs->client);
1591     return;
1592   }
1593   if (NULL == set->iter)
1594   {
1595     /* client sent an ack, but we were not expecting one (as
1596        set iteration has finished) */
1597     GNUNET_break (0);
1598     GNUNET_SERVICE_client_drop (cs->client);
1599     return;
1600   }
1601   GNUNET_SERVICE_client_continue (cs->client);
1602   if (ntohl (ack->send_more))
1603   {
1604     send_client_element (set);
1605   }
1606   else
1607   {
1608     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1609     set->iter = NULL;
1610     set->iteration_id++;
1611   }
1612 }
1613
1614
1615 /**
1616  * Handle a request from the client to copy a set.
1617  *
1618  * @param cls the client
1619  * @param mh the message
1620  */
1621 static void
1622 handle_client_copy_lazy_prepare (void *cls,
1623                                  const struct GNUNET_MessageHeader *mh)
1624 {
1625   struct ClientState *cs = cls;
1626   struct Set *set;
1627   struct LazyCopyRequest *cr;
1628   struct GNUNET_MQ_Envelope *ev;
1629   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1630
1631   if (NULL == (set = cs->set))
1632   {
1633     /* client without a set requested an operation */
1634     GNUNET_break (0);
1635     GNUNET_SERVICE_client_drop (cs->client);
1636     return;
1637   }
1638   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1639               "Client requested creation of lazy copy\n");
1640   cr = GNUNET_new (struct LazyCopyRequest);
1641   cr->cookie = ++lazy_copy_cookie;
1642   cr->source_set = set;
1643   GNUNET_CONTAINER_DLL_insert (lazy_copy_head, lazy_copy_tail, cr);
1644   ev = GNUNET_MQ_msg (resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1645   resp_msg->cookie = cr->cookie;
1646   GNUNET_MQ_send (set->cs->mq, ev);
1647   GNUNET_SERVICE_client_continue (cs->client);
1648 }
1649
1650
1651 /**
1652  * Handle a request from the client to connect to a copy of a set.
1653  *
1654  * @param cls the client
1655  * @param msg the message
1656  */
1657 static void
1658 handle_client_copy_lazy_connect (
1659   void *cls,
1660   const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1661 {
1662   struct ClientState *cs = cls;
1663   struct LazyCopyRequest *cr;
1664   struct Set *set;
1665   int found;
1666
1667   if (NULL != cs->set)
1668   {
1669     /* There can only be one set per client */
1670     GNUNET_break (0);
1671     GNUNET_SERVICE_client_drop (cs->client);
1672     return;
1673   }
1674   found = GNUNET_NO;
1675   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1676   {
1677     if (cr->cookie == msg->cookie)
1678     {
1679       found = GNUNET_YES;
1680       break;
1681     }
1682   }
1683   if (GNUNET_NO == found)
1684   {
1685     /* client asked for copy with cookie we don't know */
1686     GNUNET_break (0);
1687     GNUNET_SERVICE_client_drop (cs->client);
1688     return;
1689   }
1690   GNUNET_CONTAINER_DLL_remove (lazy_copy_head, lazy_copy_tail, cr);
1691   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1692               "Client %p requested use of lazy copy\n",
1693               cs);
1694   set = GNUNET_new (struct Set);
1695   switch (cr->source_set->operation)
1696   {
1697   case GNUNET_SET_OPERATION_INTERSECTION:
1698     set->vt = _GSS_intersection_vt ();
1699     break;
1700
1701   case GNUNET_SET_OPERATION_UNION:
1702     set->vt = _GSS_union_vt ();
1703     break;
1704
1705   default:
1706     GNUNET_assert (0);
1707     return;
1708   }
1709
1710   if (NULL == set->vt->copy_state)
1711   {
1712     /* Lazy copy not supported for this set operation */
1713     GNUNET_break (0);
1714     GNUNET_free (set);
1715     GNUNET_free (cr);
1716     GNUNET_SERVICE_client_drop (cs->client);
1717     return;
1718   }
1719
1720   set->operation = cr->source_set->operation;
1721   set->state = set->vt->copy_state (cr->source_set->state);
1722   set->content = cr->source_set->content;
1723   set->content->refcount++;
1724
1725   set->current_generation = cr->source_set->current_generation;
1726   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1727   set->excluded_generations =
1728     GNUNET_memdup (cr->source_set->excluded_generations,
1729                    set->excluded_generations_size
1730                    * sizeof(struct GenerationRange));
1731
1732   /* Advance the generation of the new set, so that mutations to the
1733      of the cloned set and the source set are independent. */
1734   advance_generation (set);
1735   set->cs = cs;
1736   cs->set = set;
1737   GNUNET_free (cr);
1738   GNUNET_SERVICE_client_continue (cs->client);
1739 }
1740
1741
1742 /**
1743  * Handle a request from the client to cancel a running set operation.
1744  *
1745  * @param cls the client
1746  * @param msg the message
1747  */
1748 static void
1749 handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg)
1750 {
1751   struct ClientState *cs = cls;
1752   struct Set *set;
1753   struct Operation *op;
1754   int found;
1755
1756   if (NULL == (set = cs->set))
1757   {
1758     /* client without a set requested an operation */
1759     GNUNET_break (0);
1760     GNUNET_SERVICE_client_drop (cs->client);
1761     return;
1762   }
1763   found = GNUNET_NO;
1764   for (op = set->ops_head; NULL != op; op = op->next)
1765   {
1766     if (op->client_request_id == ntohl (msg->request_id))
1767     {
1768       found = GNUNET_YES;
1769       break;
1770     }
1771   }
1772   if (GNUNET_NO == found)
1773   {
1774     /* It may happen that the operation was already destroyed due to
1775      * the other peer disconnecting.  The client may not know about this
1776      * yet and try to cancel the (just barely non-existent) operation.
1777      * So this is not a hard error.
1778      */GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1779                 "Client canceled non-existent op %u\n",
1780                 (uint32_t) ntohl (msg->request_id));
1781   }
1782   else
1783   {
1784     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785                 "Client requested cancel for op %u\n",
1786                 (uint32_t) ntohl (msg->request_id));
1787     _GSS_operation_destroy (op, GNUNET_YES);
1788   }
1789   GNUNET_SERVICE_client_continue (cs->client);
1790 }
1791
1792
1793 /**
1794  * Handle a request from the client to accept a set operation that
1795  * came from a remote peer.  We forward the accept to the associated
1796  * operation for handling
1797  *
1798  * @param cls the client
1799  * @param msg the message
1800  */
1801 static void
1802 handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg)
1803 {
1804   struct ClientState *cs = cls;
1805   struct Set *set;
1806   struct Operation *op;
1807   struct GNUNET_SET_ResultMessage *result_message;
1808   struct GNUNET_MQ_Envelope *ev;
1809   struct Listener *listener;
1810
1811   if (NULL == (set = cs->set))
1812   {
1813     /* client without a set requested to accept */
1814     GNUNET_break (0);
1815     GNUNET_SERVICE_client_drop (cs->client);
1816     return;
1817   }
1818   op = get_incoming (ntohl (msg->accept_reject_id));
1819   if (NULL == op)
1820   {
1821     /* It is not an error if the set op does not exist -- it may
1822     * have been destroyed when the partner peer disconnected. */
1823     GNUNET_log (
1824       GNUNET_ERROR_TYPE_INFO,
1825       "Client %p accepted request %u of listener %p that is no longer active\n",
1826       cs,
1827       ntohl (msg->accept_reject_id),
1828       cs->listener);
1829     ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1830     result_message->request_id = msg->request_id;
1831     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1832     GNUNET_MQ_send (set->cs->mq, ev);
1833     GNUNET_SERVICE_client_continue (cs->client);
1834     return;
1835   }
1836   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837               "Client accepting request %u\n",
1838               (uint32_t) ntohl (msg->accept_reject_id));
1839   listener = op->listener;
1840   op->listener = NULL;
1841   GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
1842   op->set = set;
1843   GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1844   op->client_request_id = ntohl (msg->request_id);
1845   op->result_mode = ntohl (msg->result_mode);
1846   op->byzantine = msg->byzantine;
1847   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1848   op->force_full = msg->force_full;
1849   op->force_delta = msg->force_delta;
1850
1851   /* Advance generation values, so that future mutations do not
1852      interfer with the running operation. */
1853   op->generation_created = set->current_generation;
1854   advance_generation (set);
1855   GNUNET_assert (NULL == op->state);
1856   op->state = set->vt->accept (op);
1857   if (NULL == op->state)
1858   {
1859     GNUNET_break (0);
1860     GNUNET_SERVICE_client_drop (cs->client);
1861     return;
1862   }
1863   /* Now allow CADET to continue, as we did not do this in
1864    #handle_incoming_msg (as we wanted to first see if the
1865      local client would accept the request). */
1866   GNUNET_CADET_receive_done (op->channel);
1867   GNUNET_SERVICE_client_continue (cs->client);
1868 }
1869
1870
1871 /**
1872  * Called to clean up, after a shutdown has been requested.
1873  *
1874  * @param cls closure, NULL
1875  */
1876 static void
1877 shutdown_task (void *cls)
1878 {
1879   /* Delay actual shutdown to allow service to disconnect clients */
1880   in_shutdown = GNUNET_YES;
1881   if (0 == num_clients)
1882   {
1883     if (NULL != cadet)
1884     {
1885       GNUNET_CADET_disconnect (cadet);
1886       cadet = NULL;
1887     }
1888   }
1889   GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
1890   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1891 }
1892
1893
1894 /**
1895  * Function called by the service's run
1896  * method to run service-specific setup code.
1897  *
1898  * @param cls closure
1899  * @param cfg configuration to use
1900  * @param service the initialized service
1901  */
1902 static void
1903 run (void *cls,
1904      const struct GNUNET_CONFIGURATION_Handle *cfg,
1905      struct GNUNET_SERVICE_Handle *service)
1906 {
1907   /* FIXME: need to modify SERVICE (!) API to allow
1908      us to run a shutdown task *after* clients were
1909      forcefully disconnected! */
1910   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1911   _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
1912   cadet = GNUNET_CADET_connect (cfg);
1913   if (NULL == cadet)
1914   {
1915     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1916                 _ ("Could not connect to CADET service\n"));
1917     GNUNET_SCHEDULER_shutdown ();
1918     return;
1919   }
1920 }
1921
1922
1923 /**
1924  * Define "main" method using service macro.
1925  */
1926 GNUNET_SERVICE_MAIN (
1927   "set",
1928   GNUNET_SERVICE_OPTION_NONE,
1929   &run,
1930   &client_connect_cb,
1931   &client_disconnect_cb,
1932   NULL,
1933   GNUNET_MQ_hd_fixed_size (client_accept,
1934                            GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1935                            struct GNUNET_SET_AcceptMessage,
1936                            NULL),
1937   GNUNET_MQ_hd_fixed_size (client_iter_ack,
1938                            GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1939                            struct GNUNET_SET_IterAckMessage,
1940                            NULL),
1941   GNUNET_MQ_hd_var_size (client_mutation,
1942                          GNUNET_MESSAGE_TYPE_SET_ADD,
1943                          struct GNUNET_SET_ElementMessage,
1944                          NULL),
1945   GNUNET_MQ_hd_fixed_size (client_create_set,
1946                            GNUNET_MESSAGE_TYPE_SET_CREATE,
1947                            struct GNUNET_SET_CreateMessage,
1948                            NULL),
1949   GNUNET_MQ_hd_fixed_size (client_iterate,
1950                            GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1951                            struct GNUNET_MessageHeader,
1952                            NULL),
1953   GNUNET_MQ_hd_var_size (client_evaluate,
1954                          GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1955                          struct GNUNET_SET_EvaluateMessage,
1956                          NULL),
1957   GNUNET_MQ_hd_fixed_size (client_listen,
1958                            GNUNET_MESSAGE_TYPE_SET_LISTEN,
1959                            struct GNUNET_SET_ListenMessage,
1960                            NULL),
1961   GNUNET_MQ_hd_fixed_size (client_reject,
1962                            GNUNET_MESSAGE_TYPE_SET_REJECT,
1963                            struct GNUNET_SET_RejectMessage,
1964                            NULL),
1965   GNUNET_MQ_hd_var_size (client_mutation,
1966                          GNUNET_MESSAGE_TYPE_SET_REMOVE,
1967                          struct GNUNET_SET_ElementMessage,
1968                          NULL),
1969   GNUNET_MQ_hd_fixed_size (client_cancel,
1970                            GNUNET_MESSAGE_TYPE_SET_CANCEL,
1971                            struct GNUNET_SET_CancelMessage,
1972                            NULL),
1973   GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
1974                            GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1975                            struct GNUNET_MessageHeader,
1976                            NULL),
1977   GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
1978                            GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1979                            struct GNUNET_SET_CopyLazyConnectMessage,
1980                            NULL),
1981   GNUNET_MQ_handler_end ());
1982
1983
1984 /* end of gnunet-service-set.c */