If 0 max search, use r5n dht
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
36                           do { \
37                             k = ceil(1 + log2((double) (2*B / (double) A)));\
38                             if (k<1) k=1; /* k can be calculated as 0 */\
39                             s = ceil((double) (A * k / log(2))); \
40                           } while (0)
41
42 /**
43  * Current phase we are in for a intersection operation.
44  */
45 enum IntersectionOperationPhase
46 {
47   /**
48    * Alices has suggested an operation to bob,
49    * and is waiting for a bf or session end.
50    */
51   PHASE_INITIAL,
52   /**
53    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
54    * until one notices the their element count is equal
55    */
56   PHASE_BF_EXCHANGE,
57   /**
58    * if both peers have an equal peercount, they enter this state for
59    * one more turn, to see if they actually have agreed on a correct set.
60    * if a peer finds the same element count after the next iteration,
61    * it ends the the session
62    */
63   PHASE_MAYBE_FINISHED,
64   /**
65    * The protocol is over.
66    * Results may still have to be sent to the client.
67    */
68   PHASE_FINISHED
69 };
70
71
72 /**
73  * State of an evaluate operation
74  * with another peer.
75  */
76 struct OperationState
77 {
78   /**
79    * The bf we currently receive
80    */
81   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
82
83   /**
84    * BF of the set's element.
85    */
86   struct GNUNET_CONTAINER_BloomFilter *local_bf;
87
88   /**
89    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
90    */
91   char * bf_data;
92
93   /**
94    * size of the bloomfilter
95    */
96   uint32_t bf_data_size;
97
98   /**
99    * size of the bloomfilter
100    */
101   uint32_t bf_bits_per_element;
102
103   /**
104    * Current state of the operation.
105    */
106   enum IntersectionOperationPhase phase;
107
108   /**
109    * Generation in which the operation handle
110    * was created.
111    */
112   unsigned int generation_created;
113
114   /**
115    * Maps element-id-hashes to 'elements in our set'.
116    */
117   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
118
119   /**
120    * Current element count contained within contained_elements
121    */
122   uint32_t my_element_count;
123
124   /**
125    * Iterator for sending elements on the key to element mapping to the client.
126    */
127   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
128
129   /**
130    * Evaluate operations are held in
131    * a linked list.
132    */
133   struct OperationState *next;
134
135    /**
136     * Evaluate operations are held in
137     * a linked list.
138     */
139   struct OperationState *prev;
140
141   /**
142    * Did we send the client that we are done?
143    */
144   int client_done_sent;
145 };
146
147
148 /**
149  * Extra state required for efficient set intersection.
150  */
151 struct SetState
152 {
153   /**
154    * Number of currently valid elements in the set which have not been removed
155    */
156   uint32_t current_set_element_count;
157 };
158
159
160 /**
161  * Send a result message to the client indicating
162  * we removed an element
163  *
164  * @param op union operation
165  * @param element element to send
166  */
167 static void
168 send_client_element (struct Operation *op,
169                      struct GNUNET_SET_Element *element)
170 {
171   struct GNUNET_MQ_Envelope *ev;
172   struct GNUNET_SET_ResultMessage *rm;
173
174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending removed element (size %u) to client\n", element->size);
175   GNUNET_assert (0 != op->spec->client_request_id);
176   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
177   if (NULL == ev)
178   {
179     GNUNET_MQ_discard (ev);
180     GNUNET_break (0);
181     return;
182   }
183   rm->result_status = htons (GNUNET_SET_STATUS_OK);
184   rm->request_id = htonl (op->spec->client_request_id);
185   rm->element_type = element->type;
186   memcpy (&rm[1], element->data, element->size);
187   GNUNET_MQ_send (op->spec->set->client_mq, ev);
188 }
189
190
191 /**
192  * Alice's version:
193  *
194  * fills the contained-elements hashmap with all relevant
195  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
196  *
197  * @param cls closure
198  * @param key current key code
199  * @param value value in the hash map
200  * @return #GNUNET_YES if we should continue to
201  *         iterate,
202  *         #GNUNET_NO if not.
203  */
204 static int
205 iterator_initialization_by_alice (void *cls,
206                                   const struct GNUNET_HashCode *key,
207                                   void *value)
208 {
209   struct ElementEntry *ee = value;
210   struct Operation *op = cls;
211   struct GNUNET_HashCode mutated_hash;
212
213   //only consider this element, if it is valid for us
214   if ((op->generation_created < ee->generation_removed)
215        && (op->generation_created >= ee->generation_added))
216     return GNUNET_YES;
217
218   // not contained according to bob's bloomfilter
219   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
220                            op->spec->salt,
221                            &mutated_hash);
222   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
223                                                       &mutated_hash)){
224     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
225       send_client_element (op, &ee->element);
226     return GNUNET_YES;
227   }
228
229   op->state->my_element_count++;
230   GNUNET_assert (GNUNET_YES ==
231                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
232                                                     &ee->element_hash, ee,
233                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
234
235   return GNUNET_YES;
236 }
237
238 /**
239  * fills the contained-elements hashmap with all relevant
240  * elements and adds their mutated hashes to our local bloomfilter
241  *
242  * @param cls closure
243  * @param key current key code
244  * @param value value in the hash map
245  * @return #GNUNET_YES if we should continue to
246  *         iterate,
247  *         #GNUNET_NO if not.
248  */
249 static int
250 iterator_initialization (void *cls,
251                          const struct GNUNET_HashCode *key,
252                          void *value)
253 {
254   struct ElementEntry *ee = value;
255   struct Operation *op = cls;
256
257   //only consider this element, if it is valid for us
258   if ((op->generation_created < ee->generation_removed)
259        && (op->generation_created >= ee->generation_added))
260     return GNUNET_YES;
261
262   GNUNET_assert (GNUNET_YES ==
263                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
264                                                     &ee->element_hash, ee,
265                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
266   return GNUNET_YES;
267 }
268
269
270 /**
271  * removes element from a hashmap if it is not contained within the
272  * provided remote bloomfilter. Then, fill our new bloomfilter.
273  *
274  * @param cls closure
275  * @param key current key code
276  * @param value value in the hash map
277  * @return #GNUNET_YES if we should continue to
278  *         iterate,
279  *         #GNUNET_NO if not.
280  */
281 static int
282 iterator_bf_reduce (void *cls,
283                    const struct GNUNET_HashCode *key,
284                    void *value)
285 {
286   struct ElementEntry *ee = value;
287   struct Operation *op = cls;
288   struct GNUNET_HashCode mutated_hash;
289
290   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
291
292   if (GNUNET_NO ==
293       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
294                                          &mutated_hash))
295   {
296     op->state->my_element_count--;
297     GNUNET_assert (GNUNET_YES ==
298                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
299                                                          &ee->element_hash,
300                                                          ee));
301     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
302       send_client_element (op, &ee->element);
303   }
304
305   return GNUNET_YES;
306 }
307
308 /**
309  * create a bloomfilter based on the elements given
310  *
311  * @param cls closure
312  * @param key current key code
313  * @param value value in the hash map
314  * @return #GNUNET_YES if we should continue to
315  *         iterate,
316  *         #GNUNET_NO if not.
317  */
318 static int
319 iterator_bf_create (void *cls,
320                    const struct GNUNET_HashCode *key,
321                    void *value)
322 {
323   struct ElementEntry *ee = value;
324   struct Operation *op = cls;
325   struct GNUNET_HashCode mutated_hash;
326
327   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
328
329   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
330                                     &mutated_hash);
331   return GNUNET_YES;
332 }
333
334 /**
335  * Inform the client that the union operation has failed,
336  * and proceed to destroy the evaluate operation.
337  *
338  * @param op the intersection operation to fail
339  */
340 static void
341 fail_intersection_operation (struct Operation *op)
342 {
343   struct GNUNET_MQ_Envelope *ev;
344   struct GNUNET_SET_ResultMessage *msg;
345
346   if (op->state->my_elements){
347     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
348     op->state->my_elements = NULL;
349   }
350   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
351
352   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
353   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
354   msg->request_id = htonl (op->spec->client_request_id);
355   msg->element_type = htons (0);
356   GNUNET_MQ_send (op->spec->set->client_mq, ev);
357   _GSS_operation_destroy (op);
358 }
359
360
361 /**
362  * Send a request for the evaluate operation to a remote peer
363  *
364  * @param op operation with the other peer
365  */
366 static void
367 send_operation_request (struct Operation *op)
368 {
369   struct GNUNET_MQ_Envelope *ev;
370   struct OperationRequestMessage *msg;
371
372   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
373                                 op->spec->context_msg);
374
375   if (NULL == ev)
376   {
377     /* the context message is too large */
378     GNUNET_break (0);
379     GNUNET_SERVER_client_disconnect (op->spec->set->client);
380     return;
381   }
382   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
383   msg->app_id = op->spec->app_id;
384   msg->salt = htonl (op->spec->salt);
385   msg->element_count = htonl(op->state->my_element_count);
386
387   GNUNET_MQ_send (op->mq, ev);
388
389   if (NULL != op->spec->context_msg)
390     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
391   else
392     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
393
394   if (NULL != op->spec->context_msg)
395   {
396     GNUNET_free (op->spec->context_msg);
397     op->spec->context_msg = NULL;
398   }
399 }
400
401 static void
402 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
403 {
404   struct GNUNET_MQ_Envelope *ev;
405   struct BFPart *msg;
406   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
407   uint32_t todo_size = op->state->bf_data_size - offset;
408
409   if (todo_size < chunk_size)
410     chunk_size = todo_size;
411
412   ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
413
414   msg->chunk_length = htonl (chunk_size);
415   msg->chunk_offset = htonl (offset);
416   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
417
418   GNUNET_MQ_send (op->mq, ev);
419
420   if (op->state->bf_data_size == offset + chunk_size)
421   {
422     // done
423     GNUNET_free(op->state->bf_data);
424     op->state->bf_data = NULL;
425     return;
426   }
427
428   send_bloomfilter_multipart (op, offset + chunk_size);
429 }
430
431 /**
432  * Send a bloomfilter to our peer.
433  * that the operation is over.
434  * After the result done message has been sent to the client,
435  * destroy the evaluate operation.
436  *
437  * @param op intersection operation
438  */
439 static void
440 send_bloomfilter (struct Operation *op)
441 {
442   struct GNUNET_MQ_Envelope *ev;
443   struct BFMessage *msg;
444   uint32_t bf_size;
445   uint32_t bf_elementbits;
446   uint32_t chunk_size;
447   struct GNUNET_CONTAINER_BloomFilter * local_bf;
448
449   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
450
451   CALCULATE_BF_SIZE(op->state->my_element_count,
452                     op->spec->remote_element_count,
453                     bf_size,
454                     bf_elementbits);
455
456   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
457                                                 bf_size,
458                                                 bf_elementbits);
459
460   op->spec->salt++;
461   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
462                                          &iterator_bf_create,
463                                          op);
464
465   // send our bloomfilter
466   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) {
467     // singlepart
468     chunk_size = bf_size;
469     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
470     GNUNET_assert (GNUNET_SYSERR !=
471                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
472                                                               (char*)&msg[1],
473                                                               bf_size));
474   }
475   else {
476     //multipart
477     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
478     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
479     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
480     GNUNET_assert (GNUNET_SYSERR !=
481                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
482                                                               op->state->bf_data,
483                                                               bf_size));
484     memcpy (&msg[1], op->state->bf_data, chunk_size);
485     op->state->bf_data_size = bf_size;
486   }
487   GNUNET_CONTAINER_bloomfilter_free (local_bf);
488
489   msg->sender_element_count = htonl (op->state->my_element_count);
490   msg->bloomfilter_total_length = htonl (bf_size);
491   msg->bloomfilter_length = htonl (chunk_size);
492   msg->bits_per_element = htonl (bf_elementbits);
493   msg->sender_mutator = htonl (op->spec->salt);
494
495   GNUNET_MQ_send (op->mq, ev);
496
497   if (op->state->bf_data)
498     send_bloomfilter_multipart (op, chunk_size);
499 }
500
501
502 /**
503  * Signal to the client that the operation has finished and
504  * destroy the operation.
505  *
506  * @param cls operation to destroy
507  */
508 static void
509 send_client_done_and_destroy (void *cls)
510 {
511   struct Operation *op = cls;
512   struct GNUNET_MQ_Envelope *ev;
513   struct GNUNET_SET_ResultMessage *rm;
514   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
515   rm->request_id = htonl (op->spec->client_request_id);
516   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
517   rm->element_type = htons (0);
518   GNUNET_MQ_send (op->spec->set->client_mq, ev);
519   _GSS_operation_destroy (op);
520 }
521
522
523 /**
524  * Send all elements in the full result iterator.
525  *
526  * @param cls operation
527  */
528 static void
529 send_remaining_elements (void *cls)
530 {
531   struct Operation *op = cls;
532   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
533   struct GNUNET_MQ_Envelope *ev;
534   struct GNUNET_SET_ResultMessage *rm;
535   struct GNUNET_SET_Element *element;
536   int res;
537
538   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
539   if (GNUNET_NO == res) {
540     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
541     send_client_done_and_destroy (op);
542     return;
543   }
544
545   element = &remaining->element;
546   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
547   GNUNET_assert (0 != op->spec->client_request_id);
548
549   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
550   GNUNET_assert (NULL != ev);
551
552   rm->result_status = htons (GNUNET_SET_STATUS_OK);
553   rm->request_id = htonl (op->spec->client_request_id);
554   rm->element_type = element->type;
555   memcpy (&rm[1], element->data, element->size);
556
557   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
558   GNUNET_MQ_send (op->spec->set->client_mq, ev);
559 }
560
561
562 /**
563  * Inform the peer that this operation is complete.
564  *
565  * @param op the intersection operation to fail
566  */
567 static void
568 send_peer_done (struct Operation *op)
569 {
570   struct GNUNET_MQ_Envelope *ev;
571
572   op->state->phase = PHASE_FINISHED;
573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
574   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
575   op->state->local_bf = NULL;
576
577   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
578   GNUNET_MQ_send (op->mq, ev);
579 }
580
581
582 /**
583  * Process a Bloomfilter once we got all the chunks
584  *
585  * @param op the intersection operation
586  */
587 static void
588 process_bf (struct Operation *op){
589   uint32_t old_elements;
590   uint32_t peer_elements;
591
592   old_elements = op->state->my_element_count;
593   peer_elements = op->spec->remote_element_count;
594   switch (op->state->phase)
595   {
596   case PHASE_INITIAL:
597     // If we are ot our first msg
598     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
599
600     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
601                                            &iterator_initialization_by_alice,
602                                            op);
603     break;
604   case PHASE_BF_EXCHANGE:
605   case PHASE_MAYBE_FINISHED:
606     // if we are bob or alice and are continuing operation
607     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
608                                            &iterator_bf_reduce,
609                                            op);
610     break;
611   default:
612     GNUNET_break_op (0);
613     fail_intersection_operation(op);
614   }
615   // the iterators created a new BF with salt+1
616   // the peer needs this information for decoding the next BF
617   // this behavior can be modified at will later on.
618   op->spec->salt++;
619
620   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
621   op->state->remote_bf = NULL;
622
623   if ((0 == op->state->my_element_count) // fully disjoint
624       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
625           && (old_elements == op->state->my_element_count)
626           && (op->state->my_element_count == peer_elements))) {
627     // In the last round we though we were finished, we now know this is correct
628     send_peer_done (op);
629     return;
630   }
631
632   op->state->phase = PHASE_BF_EXCHANGE;
633   if (op->state->my_element_count == peer_elements)
634     // maybe we are finished, but we do one more round to make certain
635     // we don't have false positives ...
636     op->state->phase = PHASE_MAYBE_FINISHED;
637
638   send_bloomfilter (op);
639 }
640
641
642 /**
643  * Handle an BF multipart message from a remote peer.
644  *
645  * @param cls the intersection operation
646  * @param mh the header of the message
647  */
648 static void
649 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
650 {
651   struct Operation *op = cls;
652   const struct BFPart *msg = (const struct BFPart *) mh;
653   uint32_t chunk_size;
654   uint32_t chunk_offset;
655
656   chunk_size = ntohl(msg->chunk_length);
657   chunk_offset = ntohl(msg->chunk_offset);
658
659   if ((NULL == op->state->bf_data)
660        || (op->state->bf_data_size < chunk_size + chunk_offset)){
661     // unexpected multipart chunk
662     GNUNET_break_op (0);
663     fail_intersection_operation(op);
664     return;
665   }
666
667   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
668
669   if (op->state->bf_data_size != chunk_offset + chunk_size)
670     // wait for next chunk
671     return;
672
673   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
674                                                             op->state->bf_data_size,
675                                                             op->state->bf_bits_per_element);
676
677   GNUNET_free (op->state->bf_data);
678   op->state->bf_data = NULL;
679
680   process_bf (op);
681 }
682
683
684 /**
685  * Handle an BF message from a remote peer.
686  *
687  * @param cls the intersection operation
688  * @param mh the header of the message
689  */
690 static void
691 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
692 {
693   struct Operation *op = cls;
694   const struct BFMessage *msg = (const struct BFMessage *) mh;
695   uint32_t bf_size;
696   uint32_t chunk_size;
697   uint32_t bf_bits_per_element;
698
699   switch (op->state->phase)
700   {
701   case PHASE_INITIAL:
702   case PHASE_BF_EXCHANGE:
703   case PHASE_MAYBE_FINISHED:
704     if (NULL == op->state->bf_data) {
705       // no colliding multipart transaction going on currently
706       op->spec->salt = ntohl (msg->sender_mutator);
707       bf_size = ntohl (msg->bloomfilter_total_length);
708       bf_bits_per_element = ntohl (msg->bits_per_element);
709       chunk_size = ntohl (msg->bloomfilter_length);
710       op->spec->remote_element_count = ntohl(msg->sender_element_count);
711       if (bf_size == chunk_size) {
712         // single part, done here
713         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
714                                                                   bf_size,
715                                                                   bf_bits_per_element);
716         process_bf (op);
717         return;
718       }
719
720       //first multipart chunk
721       op->state->bf_data = GNUNET_malloc (bf_size);
722       op->state->bf_data_size = bf_size;
723       op->state->bf_bits_per_element = bf_bits_per_element;
724       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
725       return;
726     }
727   default:
728     GNUNET_break_op (0);
729     fail_intersection_operation (op);
730   }
731 }
732
733
734 /**
735  * Handle an BF message from a remote peer.
736  *
737  * @param cls the intersection operation
738  * @param mh the header of the message
739  */
740 static void
741 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
742 {
743   struct Operation *op = cls;
744   struct BFMessage *msg = (struct BFMessage *) mh;
745
746   op->spec->remote_element_count = ntohl(msg->sender_element_count);
747   if ((op->state->phase != PHASE_INITIAL)
748       || (op->state->my_element_count > op->spec->remote_element_count)
749           || (0 == op->state->my_element_count)
750               || (0 == op->spec->remote_element_count)){
751     GNUNET_break_op (0);
752     fail_intersection_operation(op);
753     return;
754   }
755
756   op->state->phase = PHASE_BF_EXCHANGE;
757   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
758
759   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
760                                          &iterator_initialization,
761                                          op);
762
763   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
764   op->state->remote_bf = NULL;
765   
766   if (op->state->my_element_count == ntohl (msg->sender_element_count))
767     op->state->phase = PHASE_MAYBE_FINISHED;
768
769   send_bloomfilter (op);
770 }
771
772
773 /**
774  * Send our element count to the peer, in case our element count is lower than his
775  *
776  * @param op intersection operation
777  */
778 static void
779 send_element_count (struct Operation *op)
780 {
781   struct GNUNET_MQ_Envelope *ev;
782   struct BFMessage *msg;
783
784   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
785
786   // just send our element count, as the other peer must start
787   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
788   msg->sender_element_count = htonl (op->state->my_element_count);
789   msg->bloomfilter_length = htonl (0);
790   msg->sender_mutator = htonl (0);
791
792   GNUNET_MQ_send (op->mq, ev);
793 }
794
795
796 /**
797  * Send a result message to the client indicating
798  * that the operation is over.
799  * After the result done message has been sent to the client,
800  * destroy the evaluate operation.
801  *
802  * @param op intersection operation
803  */
804 static void
805 finish_and_destroy (struct Operation *op)
806 {
807   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
808
809   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
810   {
811     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
812     op->state->full_result_iter =
813         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
814     send_remaining_elements (op);
815     return;
816   }
817   send_client_done_and_destroy (op);
818 }
819
820
821 /**
822  * Handle a done message from a remote peer
823  *
824  * @param cls the union operation
825  * @param mh the message
826  */
827 static void
828 handle_p2p_done (void *cls,
829                  const struct GNUNET_MessageHeader *mh)
830 {
831   struct Operation *op = cls;
832
833   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
834     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
835
836     finish_and_destroy (op);
837     return;
838   }
839
840   GNUNET_break_op (0);
841   fail_intersection_operation (op);
842 }
843
844
845 /**
846  * Evaluate a union operation with
847  * a remote peer.
848  *
849  * @param op operation to evaluate
850  */
851 static void
852 intersection_evaluate (struct Operation *op)
853 {
854   op->state = GNUNET_new (struct OperationState);
855   /* we started the operation, thus we have to send the operation request */
856   op->state->phase = PHASE_INITIAL;
857   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
858   op->state->my_element_count = op->spec->set->state->current_set_element_count;
859
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "evaluating intersection operation");
862   send_operation_request (op);
863 }
864
865
866 /**
867  * Accept an union operation request from a remote peer.
868  * Only initializes the private operation state.
869  *
870  * @param op operation that will be accepted as a union operation
871  */
872 static void
873 intersection_accept (struct Operation *op)
874 {
875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
876   op->state = GNUNET_new (struct OperationState);
877   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
878   op->state->my_element_count = op->spec->set->state->current_set_element_count;
879
880   // if Alice (the peer) has more elements than Bob (us), she should start
881   if (op->spec->remote_element_count < op->state->my_element_count){
882     op->state->phase = PHASE_INITIAL;
883     send_element_count(op);
884     return;
885   }
886   // create a new bloomfilter in case we have fewer elements
887   op->state->phase = PHASE_BF_EXCHANGE;
888   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
889                                                            BLOOMFILTER_SIZE,
890                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
891   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
892                                          &iterator_initialization,
893                                          op);
894   send_bloomfilter (op);
895 }
896
897
898 /**
899  * Create a new set supporting the intersection operation
900  *
901  * @return the newly created set
902  */
903 static struct SetState *
904 intersection_set_create ()
905 {
906   struct SetState *set_state;
907
908   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909               "intersection set created\n");
910   set_state = GNUNET_new (struct SetState);
911   set_state->current_set_element_count = 0;
912
913   return set_state;
914 }
915
916
917 /**
918  * Add the element from the given element message to the set.
919  *
920  * @param set_state state of the set want to add to
921  * @param ee the element to add to the set
922  */
923 static void
924 intersection_add (struct SetState *set_state,
925                   struct ElementEntry *ee)
926 {
927   set_state->current_set_element_count++;
928 }
929
930
931 /**
932  * Destroy a set that supports the intersection operation
933  *
934  * @param set_state the set to destroy
935  */
936 static void
937 intersection_set_destroy (struct SetState *set_state)
938 {
939   GNUNET_free (set_state);
940 }
941
942
943 /**
944  * Remove the element given in the element message from the set.
945  *
946  * @param set_state state of the set to remove from
947  * @param element set element to remove
948  */
949 static void
950 intersection_remove (struct SetState *set_state,
951                      struct ElementEntry *element)
952 {
953   GNUNET_assert(0 < set_state->current_set_element_count);
954   set_state->current_set_element_count--;
955 }
956
957
958 /**
959  * Dispatch messages for a intersection operation.
960  *
961  * @param op the state of the intersection evaluate operation
962  * @param mh the received message
963  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
964  *         #GNUNET_OK otherwise
965  */
966 static int
967 intersection_handle_p2p_message (struct Operation *op,
968                                  const struct GNUNET_MessageHeader *mh)
969 {
970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
971               ntohs (mh->type), ntohs (mh->size));
972   switch (ntohs (mh->type))
973   {
974     /* this message handler is not active until after we received an
975      * operation request message, thus the ops request is not handled here
976      */
977   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
978     handle_p2p_element_info (op, mh);
979     break;
980   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
981     handle_p2p_bf (op, mh);
982     break;
983   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
984     handle_p2p_bf_part (op, mh);
985     break;
986   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
987     handle_p2p_done (op, mh);
988     break;
989   default:
990     /* something wrong with cadet's message handlers? */
991     GNUNET_assert (0);
992   }
993   return GNUNET_OK;
994 }
995
996
997 /**
998  * handler for peer-disconnects, notifies the client about the aborted operation
999  *
1000  * @param op the destroyed operation
1001  */
1002 static void
1003 intersection_peer_disconnect (struct Operation *op)
1004 {
1005   if (PHASE_FINISHED != op->state->phase)
1006   {
1007     struct GNUNET_MQ_Envelope *ev;
1008     struct GNUNET_SET_ResultMessage *msg;
1009
1010     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1011     msg->request_id = htonl (op->spec->client_request_id);
1012     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1013     msg->element_type = htons (0);
1014     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1015     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1016     _GSS_operation_destroy (op);
1017     return;
1018   }
1019   // else: the session has already been concluded
1020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1021   if (GNUNET_NO == op->state->client_done_sent)
1022     finish_and_destroy (op);
1023 }
1024
1025
1026 /**
1027  * Destroy the union operation.  Only things specific to the union operation are destroyed.
1028  *
1029  * @param op union operation to destroy
1030  */
1031 static void
1032 intersection_op_cancel (struct Operation *op)
1033 {
1034   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
1035   /* check if the op was canceled twice */
1036   GNUNET_assert (NULL != op->state);
1037   if (NULL != op->state->remote_bf)
1038   {
1039     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1040     op->state->remote_bf = NULL;
1041   }
1042   if (NULL != op->state->local_bf)
1043   {
1044     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1045     op->state->local_bf = NULL;
1046   }
1047 /*  if (NULL != op->state->my_elements)
1048   {
1049     // no need to free the elements, they are still part of the set
1050     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1051     op->state->my_elements = NULL;
1052   }*/
1053   GNUNET_free (op->state);
1054   op->state = NULL;
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
1056 }
1057
1058 const struct SetVT *
1059 _GSS_intersection_vt ()
1060 {
1061   static const struct SetVT intersection_vt = {
1062     .create = &intersection_set_create,
1063     .msg_handler = &intersection_handle_p2p_message,
1064     .add = &intersection_add,
1065     .remove = &intersection_remove,
1066     .destroy_set = &intersection_set_destroy,
1067     .evaluate = &intersection_evaluate,
1068     .accept = &intersection_accept,
1069     .peer_disconnect = &intersection_peer_disconnect,
1070     .cancel = &intersection_op_cancel,
1071   };
1072
1073   return &intersection_vt;
1074 }