added initial iterators for alice and bob to create their mutated bloomfilters and...
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "gnunet-service-set.h"
27 #include "set_protocol.h"
28
29
30 /**
31  * State of an operation where the peer has connected to us, but is not yet
32  * evaluating a set operation.  Once the peer has sent a concrete request, and
33  * the client has accepted or rejected it, this information will be deleted
34  * and replaced by the real set operation state.
35  */
36 struct OperationState
37 {
38   /**
39    * The identity of the requesting peer.  Needs to
40    * be stored here as the op spec might not have been created yet.
41    */
42   struct GNUNET_PeerIdentity peer;
43
44   /**
45    * Unique request id for the request from
46    * a remote peer, sent to the client, which will
47    * accept or reject the request.
48    * Set to '0' iff the request has not been
49    * suggested yet.
50    */
51   uint32_t suggest_id;
52
53   /**
54    * Timeout task, if the incoming peer has not been accepted
55    * after the timeout, it will be disconnected.
56    */
57   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
58 };
59
60
61 /**
62  * A listener is inhabited by a client, and
63  * waits for evaluation requests from remote peers.
64  */
65 struct Listener
66 {
67   /**
68    * Listeners are held in a doubly linked list.
69    */
70   struct Listener *next;
71
72   /**
73    * Listeners are held in a doubly linked list.
74    */
75   struct Listener *prev;
76
77   /**
78    * Client that owns the listener.
79    * Only one client may own a listener.
80    */
81   struct GNUNET_SERVER_Client *client;
82
83   /**
84    * Message queue for the client
85    */
86   struct GNUNET_MQ_Handle *client_mq;
87
88   /**
89    * The type of the operation.
90    */
91   enum GNUNET_SET_OperationType operation;
92
93   /**
94    * Application ID for the operation, used to distinguish
95    * multiple operations of the same type with the same peer.
96    */
97   struct GNUNET_HashCode app_id;
98 };
99
100
101 /**
102  * Configuration of our local peer.
103  */
104 static const struct GNUNET_CONFIGURATION_Handle *configuration;
105
106 /**
107  * Handle to the mesh service, used
108  * to listen for and connect to remote peers.
109  */
110 static struct GNUNET_MESH_Handle *mesh;
111
112 /**
113  * Sets are held in a doubly linked list.
114  */
115 static struct Set *sets_head;
116
117 /**
118  * Sets are held in a doubly linked list.
119  */
120 static struct Set *sets_tail;
121
122 /**
123  * Listeners are held in a doubly linked list.
124  */
125 static struct Listener *listeners_head;
126
127 /**
128  * Listeners are held in a doubly linked list.
129  */
130 static struct Listener *listeners_tail;
131
132 /**
133  * Incoming sockets from remote peers are
134  * held in a doubly linked list.
135  */
136 static struct Operation *incoming_head;
137
138 /**
139  * Incoming sockets from remote peers are
140  * held in a doubly linked list.
141  */
142 static struct Operation *incoming_tail;
143
144 /**
145  * Counter for allocating unique IDs for clients,
146  * used to identify incoming operation requests from remote peers,
147  * that the client can choose to accept or refuse.
148  */
149 static uint32_t suggest_id = 1;
150
151
152 /**
153  * Get set that is owned by the given client, if any.
154  *
155  * @param client client to look for
156  * @return set that the client owns, NULL if the client
157  *         does not own a set
158  */
159 static struct Set *
160 set_get (struct GNUNET_SERVER_Client *client)
161 {
162   struct Set *set;
163
164   for (set = sets_head; NULL != set; set = set->next)
165     if (set->client == client)
166       return set;
167   return NULL;
168 }
169
170
171 /**
172  * Get the listener associated with the given client, if any.
173  *
174  * @param client the client
175  * @return listener associated with the client, NULL
176  *         if there isn't any
177  */
178 static struct Listener *
179 listener_get (struct GNUNET_SERVER_Client *client)
180 {
181   struct Listener *listener;
182
183   for (listener = listeners_head; NULL != listener; listener = listener->next)
184     if (listener->client == client)
185       return listener;
186   return NULL;
187 }
188
189
190 /**
191  * Get the incoming socket associated with the given id.
192  *
193  * @param id id to look for
194  * @return the incoming socket associated with the id,
195  *         or NULL if there is none
196  */
197 static struct Operation *
198 get_incoming (uint32_t id)
199 {
200   struct Operation *op;
201
202   for (op = incoming_head; NULL != op; op = op)
203     if (op->state->suggest_id == id)
204       return op;
205   return NULL;
206 }
207
208
209 /**
210  * Destroy a listener, free all resources associated with it.
211  *
212  * @param listener listener to destroy
213  */
214 static void
215 listener_destroy (struct Listener *listener)
216 {
217   /* If the client is not dead yet, destroy it.
218    * The client's destroy callback will destroy the listener again. */
219   if (NULL != listener->client)
220   {
221     struct GNUNET_SERVER_Client *client = listener->client;
222     listener->client = NULL;
223     GNUNET_SERVER_client_disconnect (client);
224     return;
225   }
226   if (NULL != listener->client_mq)
227   {
228     GNUNET_MQ_destroy (listener->client_mq);
229     listener->client_mq = NULL;
230   }
231   GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener);
232   GNUNET_free (listener);
233 }
234
235
236 /**
237  * Iterator over hash map entries to free
238  * element entries.
239  *
240  * @param cls closure
241  * @param key current key code
242  * @param value value in the hash map
243  * @return GNUNET_YES if we should continue to
244  *         iterate,
245  *         GNUNET_NO if not.
246  */
247 static int
248 destroy_elements_iterator (void *cls,
249                            const struct GNUNET_HashCode * key,
250                            void *value)
251 {
252   struct ElementEntry *ee = value;
253
254   GNUNET_free (ee);
255   return GNUNET_YES;
256 }
257
258
259 /**
260  * Collect and destroy elements that are not needed anymore, because
261  * their lifetime (as determined by their generation) does not overlap with any active
262  * set operation.
263  */
264 void
265 collect_generation_garbage (struct Set *set)
266 {
267   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
268   struct ElementEntry *ee;
269   struct GNUNET_CONTAINER_MultiHashMap *new_elements;
270   int res;
271   struct Operation *op;
272
273   new_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
274   iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->elements);
275   while (GNUNET_OK ==
276          (res = GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ee)))
277   {
278     if (GNUNET_NO == ee->removed)
279       goto still_needed;
280     for (op = set->ops_head; NULL != op; op = op->next)
281       if ( (op->generation_created >= ee->generation_added) &&
282            (op->generation_created < ee->generation_removed) )
283         goto still_needed;
284     GNUNET_free (ee);
285     continue;
286 still_needed:
287       // we don't expect collisions, thus the replace option
288       GNUNET_CONTAINER_multihashmap_put (new_elements, &ee->element_hash, ee,
289                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
290   }
291   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
292   GNUNET_CONTAINER_multihashmap_destroy (set->elements);
293   set->elements = new_elements;
294 }
295
296
297 /**
298  * Destroy the given operation.  Call the implementation-specific cancel function
299  * of the operation.  Disconnects from the remote peer.
300  * Does not disconnect the client, as there may be multiple operations per set.
301  *
302  * @param op operation to destroy
303  */
304 void
305 _GSS_operation_destroy (struct Operation *op)
306 {
307   struct Set *set;
308
309   if (NULL == op->vt)
310     return;
311
312   set = op->spec->set;
313
314   GNUNET_assert (GNUNET_NO == op->is_incoming);
315   GNUNET_assert (NULL != op->spec);
316   GNUNET_CONTAINER_DLL_remove (op->spec->set->ops_head,
317                                op->spec->set->ops_tail,
318                                op);
319
320   op->vt->cancel (op);
321   op->vt = NULL;
322
323   if (NULL != op->spec)
324   {
325     if (NULL != op->spec->context_msg)
326     {
327       GNUNET_free (op->spec->context_msg);
328       op->spec->context_msg = NULL;
329     }
330     GNUNET_free (op->spec);
331     op->spec = NULL;
332   }
333
334   if (NULL != op->mq)
335   {
336     GNUNET_MQ_destroy (op->mq);
337     op->mq = NULL;
338   }
339
340   if (NULL != op->tunnel)
341   {
342     GNUNET_MESH_tunnel_destroy (op->tunnel);
343     op->tunnel = NULL;
344   }
345
346   collect_generation_garbage (set);
347
348   /* We rely on the tunnel end handler to free 'op'. When 'op->tunnel' was NULL,
349    * there was a tunnel end handler that will free 'op' on the call stack. */
350 }
351
352
353 /**
354  * Destroy a set, and free all resources associated with it.
355  *
356  * @param set the set to destroy
357  */
358 static void
359 set_destroy (struct Set *set)
360 {
361   /* If the client is not dead yet, destroy it.
362    * The client's destroy callback will destroy the set again.
363    * We do this so that the tunnel end handler still has a valid set handle
364    * to destroy. */
365   // TODO: use client context
366   if (NULL != set->client)
367   {
368     struct GNUNET_SERVER_Client *client = set->client;
369     set->client = NULL;
370     GNUNET_SERVER_client_disconnect (client);
371     return;
372   }
373   GNUNET_assert (NULL != set->state);
374   while (NULL != set->ops_head)
375     _GSS_operation_destroy (set->ops_head);
376   set->vt->destroy_set (set->state);
377   set->state = NULL;
378   if (NULL != set->client_mq)
379   {
380     GNUNET_MQ_destroy (set->client_mq);
381     set->client_mq = NULL;
382   }
383   if (NULL != set->iter)
384   {
385     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
386     set->iter = NULL;
387   }
388   GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set);
389   if (NULL != set->elements)
390   {
391     GNUNET_CONTAINER_multihashmap_iterate (set->elements,
392                                            destroy_elements_iterator, NULL);
393     GNUNET_CONTAINER_multihashmap_destroy (set->elements);
394     set->elements = NULL;
395   }
396   GNUNET_free (set);
397 }
398
399
400 /**
401  * Clean up after a client has disconnected
402  *
403  * @param cls closure, unused
404  * @param client the client to clean up after
405  */
406 static void
407 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
408 {
409   struct Set *set;
410   struct Listener *listener;
411
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n");
413
414   set = set_get (client);
415   if (NULL != set)
416   {
417     set->client = NULL;
418     set_destroy (set);
419     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "(client's set destroyed)\n");
420   }
421   listener = listener_get (client);
422   if (NULL != listener)
423   {
424     listener->client = NULL;
425     listener_destroy (listener);
426     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "(client's listener destroyed)\n");
427   }
428 }
429
430
431 /**
432  * Destroy an incoming request from a remote peer
433  *
434  * @param incoming remote request to destroy
435  */
436 static void
437 incoming_destroy (struct Operation *incoming)
438 {
439   GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
440   if (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task)
441   {
442     GNUNET_SCHEDULER_cancel (incoming->state->timeout_task);
443     incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK;
444   }
445   GNUNET_free (incoming->state);
446 }
447
448 /**
449  * remove & free state of the operation from the incoming list
450  * 
451  * @param incoming the element to remove
452  */
453
454 static void
455 incoming_retire (struct Operation *incoming)
456 {
457   incoming->is_incoming = GNUNET_NO;
458   GNUNET_free (incoming->state);
459   incoming->state = NULL;
460   GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
461 }
462
463
464 /**
465  * Find a listener that is interested in the given operation type
466  * and application id.
467  *
468  * @param op operation type to look for
469  * @param app_id application id to look for
470  * @return a matching listener, or NULL if no listener matches the
471  *         given operation and application id
472  */
473 static struct Listener *
474 listener_get_by_target (enum GNUNET_SET_OperationType op,
475                         const struct GNUNET_HashCode *app_id)
476 {
477   struct Listener *l;
478
479   for (l = listeners_head; NULL != l; l = l->next)
480   {
481     if (l->operation != op)
482       continue;
483     if (0 != GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id))
484       continue;
485     return l;
486   }
487   return NULL;
488 }
489
490
491 /**
492  * Suggest the given request to the listener,
493  * who can accept or reject the request.
494  *
495  * @param incoming the incoming peer with the request to suggest
496  * @param listener the listener to suggest the request to
497  */
498 static void
499 incoming_suggest (struct Operation *incoming, struct Listener *listener)
500 {
501   struct GNUNET_MQ_Envelope *mqm;
502   struct GNUNET_SET_RequestMessage *cmsg;
503
504   GNUNET_assert (NULL != incoming->spec);
505   GNUNET_assert (0 == incoming->state->suggest_id);
506   incoming->state->suggest_id = suggest_id++;
507
508   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task);
509   GNUNET_SCHEDULER_cancel (incoming->state->timeout_task);
510   incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK;
511   
512   mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
513                                  incoming->spec->context_msg);
514   GNUNET_assert (NULL != mqm);
515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id %u\n",
516               incoming->state->suggest_id);
517   cmsg->accept_id = htonl (incoming->state->suggest_id);
518   cmsg->peer_id = incoming->spec->peer;
519   GNUNET_MQ_send (listener->client_mq, mqm);
520 }
521
522
523 /**
524  * Handle a request for a set operation from
525  * another peer. 
526  * 
527  * This msg is expected as the first and only msg handled through the 
528  * non-operation bound virtual table, acceptance of this operation replaces
529  * our virtual table and subsequent msgs would be routed differently.
530  *
531  * @param op the operation state
532  * @param mh the received message
533  * @return GNUNET_OK if the tunnel should be kept alive,
534  *         GNUNET_SYSERR to destroy the tunnel
535  */
536 static int
537 handle_incoming_msg (struct Operation *op,
538                      const struct GNUNET_MessageHeader *mh)
539 {
540   const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
541   struct Listener *listener;
542   struct OperationSpecification *spec;
543
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got op request\n");
545
546   if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
547   {
548     GNUNET_break_op (0);
549     return GNUNET_SYSERR;
550   }
551
552   /* double operation request */
553   if (NULL != op->spec)
554   {
555     GNUNET_break_op (0);
556     return GNUNET_SYSERR;
557   }
558
559   spec = GNUNET_new (struct OperationSpecification);
560   spec->context_msg = GNUNET_MQ_extract_nested_mh (msg);
561   // for simplicity we just backup the context msg instead of rebuilding it later on
562   if (NULL != spec->context_msg)
563     spec->context_msg = GNUNET_copy_message (spec->context_msg);
564   spec->operation = ntohl (msg->operation);
565   spec->app_id = msg->app_id;
566   spec->salt = ntohl (msg->salt);
567   spec->peer = op->state->peer;
568
569   op->spec = spec;
570
571   if ( (NULL != spec->context_msg) &&
572        (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
573   {
574     GNUNET_break_op (0);
575     return GNUNET_SYSERR;
576   }
577
578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n",
579               ntohl (msg->operation), GNUNET_h2s (&msg->app_id));
580   listener = listener_get_by_target (ntohl (msg->operation), &msg->app_id);
581   if (NULL == listener)
582   {
583     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584                 "no listener matches incoming request, waiting with timeout\n");
585     return GNUNET_OK;
586   }
587   incoming_suggest (op, listener);
588   return GNUNET_OK;
589 }
590
591
592 /**
593  * Send the next element of a set to the set's client.  The next element is given by
594  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
595  * are no more elements in the set.  The caller must ensure that the set's iterator is
596  * valid.
597  *
598  * @param set set that should send its next element to its client
599  */
600 static void
601 send_client_element (struct Set *set)
602 {
603   int ret;
604   struct ElementEntry *ee;
605   struct GNUNET_MQ_Envelope *ev;
606
607   GNUNET_assert (NULL != set->iter);
608   ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, NULL, (const void **) &ee);
609   if (GNUNET_NO == ret)
610   {
611     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
612     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
613     set->iter = NULL;
614   }
615   else
616   {
617     struct GNUNET_SET_IterResponseMessage *msg;
618
619     GNUNET_assert (NULL != ee);
620     ev = GNUNET_MQ_msg_extra (msg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
621     memcpy (&msg[1], ee->element.data, ee->element.size);
622     msg->element_type = ee->element.type;
623   }
624   GNUNET_MQ_send (set->client_mq, ev);
625 }
626
627
628 /**
629  * Called when a client wants to iterate the elements of a set.
630  *
631  * @param cls unused
632  * @param client client that sent the message
633  * @param m message sent by the client
634  */
635 static void
636 handle_client_iterate (void *cls,
637                        struct GNUNET_SERVER_Client *client,
638                        const struct GNUNET_MessageHeader *m)
639 {
640   struct Set *set;
641
642   // iterate over a non existing set
643   set = set_get (client);
644   if (NULL == set)
645   {
646     GNUNET_break (0);
647     GNUNET_SERVER_client_disconnect (client);
648     return;
649   }
650
651   // only one concurrent iterate-action per set
652   if (NULL != set->iter)
653   {
654     GNUNET_break (0);
655     GNUNET_SERVER_client_disconnect (client);
656     return;
657   }
658   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "iterating union set with %u elements\n",
659               GNUNET_CONTAINER_multihashmap_size (set->elements));
660   GNUNET_SERVER_receive_done (client, GNUNET_OK);
661   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->elements);
662   send_client_element (set);
663 }
664
665
666 /**
667  * Called when a client wants to create a new set.
668  *
669  * @param cls unused
670  * @param client client that sent the message
671  * @param m message sent by the client
672  */
673 static void
674 handle_client_create_set (void *cls,
675                           struct GNUNET_SERVER_Client *client,
676                           const struct GNUNET_MessageHeader *m)
677 {
678   struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
679   struct Set *set;
680
681   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client created new set (operation %u)\n",
682               ntohs (msg->operation));
683
684   // max. one set per client!
685   if (NULL != set_get (client)) 
686   {
687     GNUNET_break (0);
688     GNUNET_SERVER_client_disconnect (client);
689     return;
690   }
691
692   set = GNUNET_new (struct Set);
693
694   switch (ntohs (msg->operation))
695   {
696   case GNUNET_SET_OPERATION_INTERSECTION:
697     // FIXME: implement intersection vt
698     // set->vt = _GSS_intersection_vt ();
699     break;
700   case GNUNET_SET_OPERATION_UNION:
701     set->vt = _GSS_union_vt ();
702     break;
703   default:
704     GNUNET_free (set);
705     GNUNET_break (0);
706     GNUNET_SERVER_client_disconnect (client);
707     return;
708   }
709
710   set->state = set->vt->create ();
711   set->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
712   set->client = client;
713   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
714   GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
715   GNUNET_SERVER_receive_done (client, GNUNET_OK);
716 }
717
718
719 /**
720  * Called when a client wants to create a new listener.
721  *
722  * @param cls unused
723  * @param client client that sent the message
724  * @param m message sent by the client
725  */
726 static void
727 handle_client_listen (void *cls,
728                       struct GNUNET_SERVER_Client *client,
729                       const struct GNUNET_MessageHeader *m)
730 {
731   struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m;
732   struct Listener *listener;
733   struct Operation *op;
734
735   // max. one per client!
736   if (NULL != listener_get (client))
737   {
738     GNUNET_break (0);
739     GNUNET_SERVER_client_disconnect (client);
740     return;
741   }
742   
743   listener = GNUNET_new (struct Listener);
744   listener->client = client;
745   listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
746   listener->app_id = msg->app_id;
747   listener->operation = ntohl (msg->operation);
748   GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n",
750               listener->operation, GNUNET_h2s (&listener->app_id));
751   
752   /* check for incoming requests the listener is interested in */
753   for (op = incoming_head; NULL != op; op = op->next)
754   {
755     if (NULL == op->spec)
756     {
757       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n");
758       continue;
759     }
760     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n",
761                 op->spec->operation, GNUNET_h2s (&op->spec->app_id), op->state->suggest_id);
762
763     /* don't consider the incoming request if it has been already suggested to a listener */
764     if (0 != op->state->suggest_id)
765       continue;
766     if (listener->operation != op->spec->operation)
767       continue;
768     if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &op->spec->app_id))
769       continue;
770     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n");
771     incoming_suggest (op, listener);
772   }
773   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n");
774   GNUNET_SERVER_receive_done (client, GNUNET_OK);
775 }
776
777
778 /**
779  * Called when the listening client rejects an operation
780  * request by another peer.
781  *
782  * @param cls unused
783  * @param client client that sent the message
784  * @param m message sent by the client
785  */
786 static void
787 handle_client_reject (void *cls,
788                       struct GNUNET_SERVER_Client *client,
789                       const struct GNUNET_MessageHeader *m)
790 {
791   struct Operation *incoming;
792   const struct GNUNET_SET_AcceptRejectMessage *msg;
793
794   msg = (const struct GNUNET_SET_AcceptRejectMessage *) m;
795   GNUNET_break (0 == ntohl (msg->request_id));
796
797   // no matching incoming operation for this reject
798   incoming = get_incoming (ntohl (msg->accept_reject_id));
799   if (NULL == incoming)
800   {
801     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
802     return;
803   }
804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n");
805   
806   GNUNET_MESH_tunnel_destroy (incoming->tunnel);
807   //tunnel destruction handler called immediately upon destruction
808   GNUNET_SERVER_receive_done (client, GNUNET_OK);
809 }
810
811
812 /**
813  * Called when a client wants to add/remove an element to/from a
814  * set it inhabits.
815  *
816  * @param cls unused
817  * @param client client that sent the message
818  * @param m message sent by the client
819  */
820 static void
821 handle_client_add_remove (void *cls,
822                           struct GNUNET_SERVER_Client *client,
823                           const struct GNUNET_MessageHeader *m)
824 {
825   struct Set *set;
826   const struct GNUNET_SET_ElementMessage *msg;
827   struct GNUNET_SET_Element el;
828   struct ElementEntry *ee;
829
830   // client without a set requested an operation
831   set = set_get (client);
832   if (NULL == set)
833   {
834     GNUNET_break (0);
835     GNUNET_SERVER_client_disconnect (client);
836     return;
837   }
838   GNUNET_SERVER_receive_done (client, GNUNET_OK);
839   msg = (const struct GNUNET_SET_ElementMessage *) m;
840   el.size = ntohs (m->size) - sizeof *msg;
841   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842               "client ins/rem element of size %u\n", el.size);
843   el.data = &msg[1];
844   if (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type))
845   {
846     struct GNUNET_HashCode hash;
847
848     GNUNET_CRYPTO_hash (el.data, el.size, &hash);
849     ee = GNUNET_CONTAINER_multihashmap_get (set->elements, &hash);
850     if (NULL == ee)
851     {
852       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
853       return;
854     }
855     if (GNUNET_YES == ee->removed)
856     {
857       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
858       return;
859     }
860     ee->removed = GNUNET_YES;
861     ee->generation_removed = set->current_generation;
862     set->vt->remove (set->state, ee);
863   }
864   else
865   {
866     struct ElementEntry *ee_dup;
867
868     ee = GNUNET_malloc (el.size + sizeof *ee);
869     ee->element.size = el.size;
870     memcpy (&ee[1], el.data, el.size);
871     ee->element.data = &ee[1];
872     ee->generation_added = set->current_generation;
873     ee->remote = GNUNET_NO;
874     GNUNET_CRYPTO_hash (ee->element.data, el.size, &ee->element_hash);
875     ee_dup = GNUNET_CONTAINER_multihashmap_get (set->elements,
876                                                 &ee->element_hash);
877     if (NULL != ee_dup)
878     {
879       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "element inserted twice, ignoring\n");
880       GNUNET_free (ee);
881       return;
882     }
883     GNUNET_CONTAINER_multihashmap_put (set->elements, &ee->element_hash, ee,
884                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
885     set->vt->add (set->state, ee);
886   }
887 }
888
889
890 /**
891  * Called when a client wants to evaluate a set operation with another peer.
892  *
893  * @param cls unused
894  * @param client client that sent the message
895  * @param m message sent by the client
896  */
897 static void
898 handle_client_evaluate (void *cls,
899                         struct GNUNET_SERVER_Client *client,
900                         const struct GNUNET_MessageHeader *m)
901 {
902   struct Set *set;
903   struct GNUNET_SET_EvaluateMessage *msg;
904   struct OperationSpecification *spec;
905   struct Operation *op;
906
907   set = set_get (client);
908   if (NULL == set)
909   {
910     GNUNET_break (0);
911     GNUNET_SERVER_client_disconnect (client);
912     return;
913   }
914
915   msg = (struct GNUNET_SET_EvaluateMessage *) m;
916   spec = GNUNET_new (struct OperationSpecification);
917   spec->operation = set->operation;
918   spec->app_id = msg->app_id;
919   spec->salt = ntohl (msg->salt);
920   spec->peer = msg->target_peer;
921   spec->set = set;
922   spec->result_mode = ntohs (msg->result_mode);
923   spec->client_request_id = ntohl (msg->request_id);
924   spec->context_msg = GNUNET_MQ_extract_nested_mh (msg);
925   
926   // for simplicity we just backup the context msg instead of rebuilding it later on
927   if (NULL != spec->context_msg)
928     spec->context_msg = GNUNET_copy_message (spec->context_msg);
929
930   op = GNUNET_new (struct Operation);
931   op->spec = spec;
932   op->generation_created = set->current_generation++;
933   op->vt = set->vt;
934   GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
935
936   op->tunnel = GNUNET_MESH_tunnel_create (mesh, op, &msg->target_peer,
937                                           GNUNET_APPLICATION_TYPE_SET,
938                                           GNUNET_YES,
939                                           GNUNET_YES);
940
941   op->mq = GNUNET_MESH_mq_create (op->tunnel);
942
943   set->vt->evaluate (op);
944   GNUNET_SERVER_receive_done (client, GNUNET_OK);
945 }
946
947
948 /**
949  * Handle an ack from a client, and send the next element.
950  * 
951  * @param cls unused
952  * @param client the client
953  * @param m the message
954  */
955 static void
956 handle_client_iter_ack (void *cls,
957                    struct GNUNET_SERVER_Client *client,
958                    const struct GNUNET_MessageHeader *m)
959 {
960   struct Set *set;
961
962   // client without a set requested an operation
963   set = set_get (client);
964   if (NULL == set)
965   {
966     GNUNET_break (0);
967     GNUNET_SERVER_client_disconnect (client);
968     return;
969   }
970
971   // client sent an ack, but we were not expecting one
972   if (NULL == set->iter)
973   {
974     GNUNET_break (0);
975     GNUNET_SERVER_client_disconnect (client);
976     return;
977   }
978
979   GNUNET_SERVER_receive_done (client, GNUNET_OK);
980   send_client_element (set);
981 }
982
983
984 /**
985  * Handle a request from the client to
986  * cancel a running set operation.
987  *
988  * @param cls unused
989  * @param client the client
990  * @param mh the message
991  */
992 static void
993 handle_client_cancel (void *cls,
994                       struct GNUNET_SERVER_Client *client,
995                       const struct GNUNET_MessageHeader *mh)
996 {
997   const struct GNUNET_SET_CancelMessage *msg =
998       (const struct GNUNET_SET_CancelMessage *) mh;
999   struct Set *set;
1000   struct Operation *op;
1001   int found;
1002
1003   // client without a set requested an operation
1004   set = set_get (client);
1005   if (NULL == set)
1006   {
1007     GNUNET_break (0);
1008     GNUNET_SERVER_client_disconnect (client);
1009     return;
1010   }
1011   found = GNUNET_NO;
1012   for (op = set->ops_head; NULL != op; op = op->next)
1013   {
1014     if (op->spec->client_request_id == msg->request_id)
1015     {
1016       found = GNUNET_YES;
1017       break;
1018     }
1019   }
1020
1021   if (GNUNET_NO == found)
1022   {
1023     GNUNET_break (0);
1024     GNUNET_SERVER_client_disconnect (client);
1025     return;
1026   }
1027   
1028   _GSS_operation_destroy (op);
1029 }
1030
1031
1032 /**
1033  * Handle a request from the client to accept
1034  * a set operation that came from a remote peer.
1035  * We forward the accept to the associated operation for handling
1036  * 
1037  * @param cls unused
1038  * @param client the client
1039  * @param mh the message
1040  */
1041 static void
1042 handle_client_accept (void *cls,
1043                       struct GNUNET_SERVER_Client *client,
1044                       const struct GNUNET_MessageHeader *mh)
1045 {
1046   struct Set *set;
1047   struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh;
1048   struct Operation *op;
1049
1050   op = get_incoming (ntohl (msg->accept_reject_id));
1051
1052   // incoming operation does not exist
1053   if (NULL == op)
1054   {
1055     GNUNET_break (0);
1056     GNUNET_SERVER_client_disconnect (client);
1057     return;
1058   }
1059
1060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl (msg->accept_reject_id));
1061
1062   GNUNET_assert (GNUNET_YES == op->is_incoming);
1063
1064   // client without a set requested an operation
1065   set = set_get (client);
1066   
1067   if (NULL == set)
1068   {
1069     GNUNET_break (0);
1070     GNUNET_SERVER_client_disconnect (client);
1071     return;
1072   }
1073
1074   op->spec->set = set;
1075
1076   incoming_retire (op);
1077
1078   GNUNET_assert (NULL != op->spec->set);
1079   GNUNET_assert (NULL != op->spec->set->vt);
1080
1081   GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1082
1083   op->spec->client_request_id = ntohl (msg->request_id);
1084   op->spec->result_mode = ntohs (msg->result_mode);
1085   op->generation_created = set->current_generation++;
1086   op->vt = op->spec->set->vt;
1087   GNUNET_assert (NULL != op->vt->accept);
1088   set->vt->accept (op);
1089   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1090 }
1091
1092
1093 /**
1094  * Called to clean up, after a shutdown has been requested.
1095  *
1096  * @param cls closure
1097  * @param tc context information (why was this task triggered now)
1098  */
1099 static void
1100 shutdown_task (void *cls,
1101                const struct GNUNET_SCHEDULER_TaskContext *tc)
1102 {
1103   while (NULL != incoming_head)
1104     incoming_destroy (incoming_head);
1105
1106   while (NULL != listeners_head)
1107     listener_destroy (listeners_head);
1108
1109   while (NULL != sets_head)
1110     set_destroy (sets_head);
1111
1112   /* it's important to destroy mesh at the end, as all tunnels
1113    * must be destroyed before the mesh handle! */
1114   if (NULL != mesh)
1115   {
1116     GNUNET_MESH_disconnect (mesh);
1117     mesh = NULL;
1118   }
1119
1120   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1121 }
1122
1123
1124 /**
1125  * Timeout happens iff:
1126  *  - we suggested an operation to our listener, 
1127  *    but did not receive a response in time
1128  *  - we got the tunnel from a peer but no GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1129  *  - shutdown (obviously)
1130  * @param cls tunnel context
1131  * @param tc context information (why was this task triggered now)
1132  */
1133 static void
1134 incoming_timeout_cb (void *cls,
1135                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1136 {
1137   struct Operation *incoming = cls;
1138
1139   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1140
1141   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1142     return;
1143
1144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "remote peer timed out\n");
1145   incoming_destroy (incoming);
1146 }
1147
1148
1149 /**
1150  * Terminates an incoming operation in case we have not yet received an
1151  * operation request. Called by the tunnel destruction handler.
1152  * 
1153  * @param op the tunnel context
1154  */
1155 static void
1156 handle_incoming_disconnect (struct Operation *op)
1157 {
1158   GNUNET_assert (GNUNET_YES == op->is_incoming);
1159   
1160   if (NULL == op->tunnel)
1161     return;
1162
1163   incoming_destroy (op);
1164 }
1165
1166
1167 /**
1168  * Method called whenever another peer has added us to a tunnel
1169  * the other peer initiated.
1170  * Only called (once) upon reception of data with a message type which was
1171  * subscribed to in GNUNET_MESH_connect. 
1172  * 
1173  * The tunnel context represents the operation itself and gets added to a DLL,
1174  * from where it gets looked up when our local listener client responds 
1175  * to a proposed/suggested operation or connects and associates with this operation.
1176  *
1177  * @param cls closure
1178  * @param tunnel new handle to the tunnel
1179  * @param initiator peer that started the tunnel
1180  * @param port Port this tunnel is for.
1181  * @return initial tunnel context for the tunnel
1182  *         (can be NULL -- that's not an error)
1183  */
1184 static void *
1185 tunnel_new_cb (void *cls,
1186                struct GNUNET_MESH_Tunnel *tunnel,
1187                const struct GNUNET_PeerIdentity *initiator,
1188                uint32_t port)
1189 {
1190   struct Operation *incoming;
1191   static const struct SetVT incoming_vt = {
1192     .msg_handler = handle_incoming_msg,
1193     .peer_disconnect = handle_incoming_disconnect
1194   };
1195
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n");
1197
1198   GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
1199   incoming = GNUNET_new (struct Operation);
1200   incoming->is_incoming = GNUNET_YES;
1201   incoming->state = GNUNET_new (struct OperationState);
1202   incoming->state->peer = *initiator;
1203   incoming->tunnel = tunnel;
1204   incoming->mq = GNUNET_MESH_mq_create (incoming->tunnel);
1205   incoming->vt = &incoming_vt;
1206   incoming->state->timeout_task =
1207       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming);
1208   GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
1209
1210   return incoming;
1211 }
1212
1213
1214 /**
1215  * Function called whenever a tunnel is destroyed.  Should clean up
1216  * any associated state.
1217  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1218  * the tunnel.
1219  * 
1220  * The peer_disconnect function is part of a a virtual table set initially either 
1221  * when a peer creates a new tunnel with us (tunnel_new_cb), or once we create
1222  * a new tunnel ourselves (evaluate). 
1223  * 
1224  * Once we know the exact type of operation (union/intersection), the vt is 
1225  * replaced with an operation specific instance (_GSS_[op]_vt).
1226  *
1227  * @param cls closure (set from GNUNET_MESH_connect)
1228  * @param tunnel connection to the other end (henceforth invalid)
1229  * @param tunnel_ctx place where local state associated
1230  *                   with the tunnel is stored
1231  */
1232 static void
1233 tunnel_end_cb (void *cls,
1234                const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
1235 {
1236   struct Operation *op = tunnel_ctx;
1237
1238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tunnel end cb called\n");
1239
1240   op->tunnel = NULL;
1241
1242   if (NULL != op->vt)
1243     op->vt->peer_disconnect (op);
1244   /* mesh will never call us with the context again! */
1245   GNUNET_free (tunnel_ctx);
1246 }
1247
1248
1249 /**
1250  * Functions with this signature are called whenever any message is
1251  * received via the mesh tunnel.
1252  *
1253  * The msg_handler is a virtual table set in initially either when a peer 
1254  * creates a new tunnel with us (tunnel_new_cb), or once we create a new tunnel 
1255  * ourselves (evaluate). 
1256  * 
1257  * Once we know the exact type of operation (union/intersection), the vt is 
1258  * replaced with an operation specific instance (_GSS_[op]_vt).
1259  *
1260  * @param cls Closure (set from GNUNET_MESH_connect).
1261  * @param tunnel Connection to the other end.
1262  * @param tunnel_ctx Place to store local state associated with the tunnel.
1263  * @param message The actual message.
1264  *
1265  * @return GNUNET_OK to keep the tunnel open,
1266  *         GNUNET_SYSERR to close it (signal serious error).
1267  */
1268 static int
1269 dispatch_p2p_message (void *cls,
1270                       struct GNUNET_MESH_Tunnel *tunnel,
1271                       void **tunnel_ctx,
1272                       const struct GNUNET_MessageHeader *message)
1273 {
1274   struct Operation *op = *tunnel_ctx;
1275   int ret;
1276
1277   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n",
1278               ntohs (message->type));
1279   /* do this before the handler, as the handler might kill the tunnel */
1280   GNUNET_MESH_receive_done (tunnel);
1281   ret = op->vt->msg_handler (op, message);
1282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n",
1283               ntohs (message->type));
1284   return ret;
1285 }
1286
1287
1288 /**
1289  * Function called by the service's run
1290  * method to run service-specific setup code.
1291  *
1292  * @param cls closure
1293  * @param server the initialized server
1294  * @param cfg configuration to use
1295  */
1296 static void
1297 run (void *cls, struct GNUNET_SERVER_Handle *server,
1298      const struct GNUNET_CONFIGURATION_Handle *cfg)
1299 {
1300   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1301     {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1302         sizeof (struct GNUNET_SET_AcceptRejectMessage)},
1303     {handle_client_iter_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ITER_ACK, 0},
1304     {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
1305     {handle_client_create_set, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE,
1306         sizeof (struct GNUNET_SET_CreateMessage)},
1307     {handle_client_iterate, NULL, GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1308         sizeof (struct GNUNET_MessageHeader)},
1309     {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
1310     {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN,
1311         sizeof (struct GNUNET_SET_ListenMessage)},
1312     {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT,
1313         sizeof (struct GNUNET_SET_AcceptRejectMessage)},
1314     {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
1315     {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL,
1316         sizeof (struct GNUNET_SET_CancelMessage)},
1317     {NULL, NULL, 0, 0}
1318   };
1319   static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
1320     {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1321     {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1322     {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1323     {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
1324     {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1325     {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1326     {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
1327     {NULL, 0, 0}
1328   };
1329   static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
1330
1331   configuration = cfg;
1332   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1333                                 &shutdown_task, NULL);
1334   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
1335   GNUNET_SERVER_add_handlers (server, server_handlers);
1336
1337   mesh = GNUNET_MESH_connect (cfg, NULL, tunnel_new_cb, tunnel_end_cb,
1338                               mesh_handlers, mesh_ports);
1339   if (NULL == mesh)
1340   {
1341     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not connect to mesh\n");
1342     return;
1343   }
1344
1345   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "started\n");
1346 }
1347
1348
1349 /**
1350  * The main function for the set service.
1351  *
1352  * @param argc number of arguments from the command line
1353  * @param argv command line arguments
1354  * @return 0 ok, 1 on error
1355  */
1356 int
1357 main (int argc, char *const *argv)
1358 {
1359   int ret;
1360   ret = GNUNET_SERVICE_run (argc, argv, "set",
1361                             GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1362   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1363   return (GNUNET_OK == ret) ? 0 : 1;
1364 }
1365