- distribute peers equally among island nodes on SuperMUC
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "set_protocol.h"
30
31
32 /**
33  * Configuration of our local peer.
34  */
35 const struct GNUNET_CONFIGURATION_Handle *configuration;
36
37 /**
38  * Socket listening for other peers via stream.
39  */
40 static struct GNUNET_STREAM_ListenSocket *stream_listen_socket;
41
42 /**
43  * Sets are held in a doubly linked list.
44  */
45 static struct Set *sets_head;
46
47 /**
48  * Sets are held in a doubly linked list.
49  */
50 static struct Set *sets_tail;
51
52 /**
53  * Listeners are held in a doubly linked list.
54  */
55 static struct Listener *listeners_head;
56
57 /**
58  * Listeners are held in a doubly linked list.
59  */
60 static struct Listener *listeners_tail;
61
62 /**
63  * Incoming sockets from remote peers are
64  * held in a doubly linked list.
65  */
66 static struct Incoming *incoming_head;
67
68 /**
69  * Incoming sockets from remote peers are
70  * held in a doubly linked list.
71  */
72 static struct Incoming *incoming_tail;
73
74 /**
75  * Counter for allocating unique request IDs for clients.
76  * Used to identify incoming requests from remote peers.
77  */
78 static uint32_t request_id = 1;
79
80
81 /**
82  * Disconnect a client and free all resources
83  * that the client allocated (e.g. Sets or Listeners)
84  *
85  * @param client the client to disconnect
86  */
87 void
88 _GSS_client_disconnect (struct GNUNET_SERVER_Client *client)
89 {
90   /* FIXME: clean up any data structures belonging to the client */
91   GNUNET_SERVER_client_disconnect (client);
92 }
93
94
95 /**
96  * Get set that is owned by the client, if any.
97  *
98  * @param client client to look for
99  * @return set that the client owns, NULL if the client
100  *         does not own a set
101  */
102 static struct Set *
103 get_set (struct GNUNET_SERVER_Client *client)
104 {
105   struct Set *set;
106   for (set = sets_head; NULL != set; set = set->next)
107     if (set->client == client)
108       return set;
109   return NULL;
110 }
111
112
113 /**
114  * Get the listener associated to a client, if any.
115  *
116  * @param client the client
117  * @return listener associated with the client, NULL
118  *         if there isn't any
119  */
120 static struct Listener *
121 get_listener (struct GNUNET_SERVER_Client *client)
122 {
123   struct Listener *listener;
124   for (listener = listeners_head; NULL != listener; listener = listener->next)
125     if (listener->client == client)
126       return listener;
127   return NULL;
128 }
129
130
131 /**
132  * Get the incoming socket associated with the given id.
133  *
134  * @param id id to look for
135  * @return the incoming socket associated with the id,
136  *         or NULL if there is none
137  */
138 static struct Incoming *
139 get_incoming (uint32_t id)
140 {
141   struct Incoming *incoming;
142   for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
143     if (incoming->request_id == id)
144       return incoming;
145   return NULL;
146 }
147
148
149 /**
150  * Destroy an incoming request from a remote peer
151  *
152  * @param incoming remote request to destroy
153  */
154 static void
155 destroy_incoming (struct Incoming *incoming)
156 {
157   if (NULL != incoming->mq)
158   {
159     GNUNET_MQ_destroy (incoming->mq);
160     incoming->mq = NULL;
161   }
162   if (NULL != incoming->socket)
163   {
164     GNUNET_STREAM_close (incoming->socket);
165     incoming->socket = NULL;
166   }
167   GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
168   GNUNET_free (incoming);
169 }
170
171
172 /**
173  * Handle a request for a set operation from
174  * another peer.
175  *
176  * @param cls the incoming socket
177  * @param mh the message
178  */
179 static void
180 handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
181 {
182   struct Incoming *incoming = cls;
183   const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
184   struct GNUNET_MQ_Message *mqm;
185   struct RequestMessage *cmsg;
186   struct Listener *listener;
187   const struct GNUNET_MessageHeader *context_msg;
188
189   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got operation request\n");
190
191   if (ntohs (mh->size) < sizeof *msg)
192   {
193     GNUNET_break (0);
194     destroy_incoming (incoming);
195     return;
196   }
197   else if (ntohs (mh->size) == sizeof *msg)
198   {
199     context_msg = NULL;
200   }
201   else
202   {
203     context_msg = &msg[1].header;
204     if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size))
205     {
206       /* size of context message is invalid */
207       GNUNET_break (0);
208       destroy_incoming (incoming);
209       return;
210     }
211   }
212   /* find the appropriate listener */
213   for (listener = listeners_head;
214        listener != NULL;
215        listener = listener->next)
216   {
217     if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
218          (htons (msg->operation) != listener->operation) )
219       continue;
220     mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
221     if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg))
222     {
223       /* FIXME: disconnect the peer */
224       GNUNET_MQ_discard (mqm);
225       GNUNET_break (0);
226       return;
227     }
228     incoming->request_id = request_id++;
229     cmsg->request_id = htonl (incoming->request_id);
230     GNUNET_MQ_send (listener->client_mq, mqm);
231     return;
232   }
233 }
234
235
236 /**
237  * Called when a client wants to create a new set.
238  *
239  * @param cls unused
240  * @param client client that sent the message
241  * @param m message sent by the client
242  */
243 static void
244 handle_client_create (void *cls,
245                       struct GNUNET_SERVER_Client *client,
246                       const struct GNUNET_MessageHeader *m)
247 {
248   struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
249   struct Set *set;
250
251   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n");
252
253   if (NULL != get_set (client))
254   {
255     GNUNET_break (0);
256     GNUNET_SERVER_client_disconnect (client);
257     return;
258   }
259
260   set = GNUNET_new (struct Set);
261
262   switch (ntohs (msg->operation))
263   {
264     case GNUNET_SET_OPERATION_INTERSECTION:
265       /* FIXME: cfuchs */
266       GNUNET_assert (0);
267       break;
268     case GNUNET_SET_OPERATION_UNION:
269       set = _GSS_union_set_create ();
270       break;
271     default:
272       GNUNET_free (set);
273       GNUNET_break (0);
274       GNUNET_SERVER_client_disconnect (client);
275       return;
276   }
277
278   set->client = client;
279   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
280   GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
281   
282   GNUNET_SERVER_receive_done (client, GNUNET_OK);
283 }
284
285
286 /**
287  * Called when a client wants to create a new listener.
288  *
289  * @param cls unused
290  * @param client client that sent the message
291  * @param m message sent by the client
292  */
293 static void
294 handle_client_listen (void *cls,
295                       struct GNUNET_SERVER_Client *client,
296                       const struct GNUNET_MessageHeader *m)
297 {
298   struct ListenMessage *msg = (struct ListenMessage *) m;
299   struct Listener *listener;
300   
301   if (NULL != get_listener (client))
302   {
303     GNUNET_break (0);
304     GNUNET_SERVER_client_disconnect (client);
305     return;
306   }
307   
308   listener = GNUNET_new (struct Listener);
309   listener->app_id = msg->app_id;
310   listener->operation = msg->operation;
311   GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
312
313   GNUNET_SERVER_receive_done (client, GNUNET_OK);
314 }
315
316
317 /**
318  * Called when a client wants to remove an element
319  * from the set it inhabits.
320  *
321  * @param cls unused
322  * @param client client that sent the message
323  * @param m message sent by the client
324  */
325 static void
326 handle_client_remove (void *cls,
327                       struct GNUNET_SERVER_Client *client,
328                       const struct GNUNET_MessageHeader *m)
329 {
330   struct Set *set;
331
332   set = get_set (client);
333   if (NULL == set)
334   {
335     GNUNET_break (0);
336     _GSS_client_disconnect (client);
337     return;
338   }
339   switch (set->operation)
340   {
341     case GNUNET_SET_OPERATION_UNION:
342       _GSS_union_add ((struct ElementMessage *) m, set);
343     case GNUNET_SET_OPERATION_INTERSECTION:
344       /* FIXME: cfuchs */
345       break;
346     default:
347       GNUNET_assert (0);
348       break;
349   }
350
351   GNUNET_SERVER_receive_done (client, GNUNET_OK);
352 }
353
354
355 /**
356  * Called when a client wants to add an element to a
357  * set it inhabits.
358  *
359  * @param cls unused
360  * @param client client that sent the message
361  * @param m message sent by the client
362  */
363 static void
364 handle_client_add (void *cls,
365                       struct GNUNET_SERVER_Client *client,
366                       const struct GNUNET_MessageHeader *m)
367 {
368   struct Set *set;
369
370   set = get_set (client);
371   if (NULL == set)
372   {
373     GNUNET_break (0);
374     _GSS_client_disconnect (client);
375     return;
376   }
377   switch (set->operation)
378   {
379     case GNUNET_SET_OPERATION_UNION:
380       _GSS_union_remove ((struct ElementMessage *) m, set);
381     case GNUNET_SET_OPERATION_INTERSECTION:
382       /* FIXME: cfuchs */
383       break;
384     default:
385       GNUNET_assert (0);
386       break;
387   }
388
389   GNUNET_SERVER_receive_done (client, GNUNET_OK);
390 }
391
392
393 /**
394  * Called when a client wants to evaluate a set operation with another peer.
395  *
396  * @param cls unused
397  * @param client client that sent the message
398  * @param m message sent by the client
399  */
400 static void
401 handle_client_evaluate (void *cls,
402                         struct GNUNET_SERVER_Client *client,
403                         const struct GNUNET_MessageHeader *m)
404 {
405   struct Set *set;
406
407   set = get_set (client);
408   if (NULL == set)
409   {
410     GNUNET_break (0);
411     _GSS_client_disconnect (client);
412     return;
413   }
414
415
416   switch (set->operation)
417   {
418     case GNUNET_SET_OPERATION_INTERSECTION:
419       /* FIXME: cfuchs */
420       break;
421     case GNUNET_SET_OPERATION_UNION:
422       _GSS_union_evaluate ((struct EvaluateMessage *) m, set);
423       break;
424     default:
425       GNUNET_assert (0);
426       break;
427   }
428
429   GNUNET_SERVER_receive_done (client, GNUNET_OK);
430 }
431
432
433 /**
434  * Handle a cancel request from a client.
435  *
436  * @param cls unused
437  * @param client the client
438  * @param m the cancel message
439  */
440 static void
441 handle_client_cancel (void *cls,
442                       struct GNUNET_SERVER_Client *client,
443                       const struct GNUNET_MessageHeader *m)
444 {
445   /* FIXME: implement */
446   GNUNET_SERVER_receive_done (client, GNUNET_OK);
447 }
448
449
450 /**
451  * Handle an ack from a client.
452  *
453  * @param cls unused
454  * @param client the client
455  * @param m the message
456  */
457 static void
458 handle_client_ack (void *cls,
459                    struct GNUNET_SERVER_Client *client,
460                    const struct GNUNET_MessageHeader *m)
461 {
462   /* FIXME: implement */
463   GNUNET_SERVER_receive_done (client, GNUNET_OK);
464 }
465
466
467 /**
468  * Handle a request from the client to accept
469  * a set operation that came from a remote peer.
470  *
471  * @param cls unused
472  * @param client the client
473  * @param mh the message
474  */
475 static void
476 handle_client_accept (void *cls,
477                       struct GNUNET_SERVER_Client *client,
478                       const struct GNUNET_MessageHeader *mh)
479 {
480   struct Set *set;
481   struct Incoming *incoming;
482   struct AcceptMessage *msg = (struct AcceptMessage *) mh;
483
484   set = get_set (client);
485
486   if (NULL == set)
487   {
488     GNUNET_break (0);
489     _GSS_client_disconnect (client);
490     return;
491   }
492
493   incoming = get_incoming (ntohl (msg->request_id));
494
495   if ( (NULL == incoming) ||
496        (incoming->operation != set->operation) )
497   {
498     GNUNET_break (0);
499     _GSS_client_disconnect (client);
500     return;
501   }
502
503   switch (set->operation)
504   {
505     case GNUNET_SET_OPERATION_INTERSECTION:
506       /* FIXME: cfuchs*/
507       GNUNET_assert (0);
508       break;
509     case GNUNET_SET_OPERATION_UNION:
510       _GSS_union_accept (msg, set, incoming);
511       break;
512     default:
513       GNUNET_assert (0);
514       break;
515   }
516   /* FIXME: destroy incoming */
517
518   GNUNET_SERVER_receive_done (client, GNUNET_OK);
519 }
520
521
522 /**
523  * Functions of this type are called upon new stream connection from other peers
524  * or upon binding error which happen when the app_port given in
525  * GNUNET_STREAM_listen() is already taken.
526  *
527  * @param cls the closure from GNUNET_STREAM_listen
528  * @param socket the socket representing the stream; NULL on binding error
529  * @param initiator the identity of the peer who wants to establish a stream
530  *            with us; NULL on binding error
531  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
532  *             stream (the socket will be invalid after the call)
533  */
534 static int
535 stream_listen_cb (void *cls,
536                   struct GNUNET_STREAM_Socket *socket,
537                   const struct GNUNET_PeerIdentity *initiator)
538 {
539   struct Incoming *incoming;
540   static const struct GNUNET_MQ_Handler handlers[] = {
541     {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
542     GNUNET_MQ_HANDLERS_END
543   };
544
545   if (NULL == socket)
546   {
547     GNUNET_break (0);
548     return GNUNET_SYSERR;
549   }
550
551   incoming = GNUNET_new (struct Incoming);
552   incoming->peer = *initiator;
553   incoming->socket = socket;
554   incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket, handlers, incoming);
555   /* FIXME: timeout for peers that only connect but don't send anything */
556   GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
557   return GNUNET_OK;
558 }
559
560
561 /**
562  * Called to clean up, after a shutdown has been requested.
563  *
564  * @param cls closure
565  * @param tc context information (why was this task triggered now)
566  */
567 static void
568 shutdown_task (void *cls,
569                const struct GNUNET_SCHEDULER_TaskContext *tc)
570 {
571   if (NULL != stream_listen_socket)
572   {
573     GNUNET_STREAM_listen_close (stream_listen_socket);
574     stream_listen_socket = NULL;
575   }
576
577   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
578 }
579
580
581 /**
582  * Function called by the service's run
583  * method to run service-specific setup code.
584  *
585  * @param cls closure
586  * @param server the initialized server
587  * @param cfg configuration to use
588  */
589 static void
590 run (void *cls, struct GNUNET_SERVER_Handle *server,
591      const struct GNUNET_CONFIGURATION_Handle *cfg)
592 {
593   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
594     {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
595     {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
596     {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
597     {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
598     {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
599     {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
600     {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
601     {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
602     {NULL, NULL, 0, 0}
603   };
604
605   configuration = cfg;
606   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
607   GNUNET_SERVER_add_handlers (server, server_handlers);
608   stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
609                                                &stream_listen_cb, NULL,
610                                                GNUNET_STREAM_OPTION_END);
611 }
612
613
614 /**
615  * The main function for the set service.
616  *
617  * @param argc number of arguments from the command line
618  * @param argv command line arguments
619  * @return 0 ok, 1 on error
620  */
621 int
622 main (int argc, char *const *argv)
623 {
624   int ret;
625   ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
626   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
627   return (GNUNET_OK == ret) ? 0 : 1;
628 }
629