started implementing union operation for set
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "set_protocol.h"
30
31
32 /**
33  * Configuration of our local peer.
34  */
35 const struct GNUNET_CONFIGURATION_Handle *configuration;
36
37 /**
38  * Socket listening for other peers via stream.
39  */
40 static struct GNUNET_STREAM_ListenSocket *stream_listen_socket;
41
42 /**
43  * Sets are held in a doubly linked list.
44  */
45 static struct Set *sets_head;
46
47 /**
48  * Sets are held in a doubly linked list.
49  */
50 static struct Set *sets_tail;
51
52 /**
53  * Listeners are held in a doubly linked list.
54  */
55 static struct Listener *listeners_head;
56
57 /**
58  * Listeners are held in a doubly linked list.
59  */
60 static struct Listener *listeners_tail;
61
62 /**
63  * Incoming sockets from remote peers are
64  * held in a doubly linked list.
65  */
66 static struct Incoming *incoming_head;
67
68 /**
69  * Incoming sockets from remote peers are
70  * held in a doubly linked list.
71  */
72 static struct Incoming *incoming_tail;
73
74 /**
75  * Counter for allocating unique request IDs for clients.
76  */
77 static uint32_t request_id = 1;
78
79
80 /**
81  * Disconnect a client and free all resources
82  * that the client allocated (e.g. Sets or Listeners)
83  *
84  * @param client the client to disconnect
85  */
86 void
87 client_disconnect (struct GNUNET_SERVER_Client *client)
88 {
89   /* FIXME: clean up any data structures belonging to the client */
90   GNUNET_SERVER_client_disconnect (client);
91 }
92
93
94 /**
95  * Get set that is owned by the client, if any.
96  *
97  * @param client client to look for
98  * @return set that the client owns, NULL if the client
99  *         does not own a set
100  */
101 static struct Set *
102 get_set (struct GNUNET_SERVER_Client *client)
103 {
104   struct Set *set;
105   for (set = sets_head; NULL != set; set = set->next)
106     if (set->client == client)
107       return set;
108   return NULL;
109 }
110
111
112 /**
113  * Get the listener associated to a client, if any.
114  *
115  * @param client the client
116  * @return listener associated with the client, NULL
117  *         if there isn't any
118  */
119 static struct Listener *
120 get_listener (struct GNUNET_SERVER_Client *client)
121 {
122   struct Listener *listener;
123   for (listener = listeners_head; NULL != listener; listener = listener->next)
124     if (listener->client == client)
125       return listener;
126   return NULL;
127 }
128
129 /**
130  * Get the incoming socket associated with the given id
131  *
132  * @param id id to look for
133  * @return the incoming socket associated with the id,
134  *         or NULL if there is none
135  */
136 static struct Incoming *
137 get_incoming (uint32_t id)
138 {
139   struct Incoming *incoming;
140   for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
141     if (incoming->request_id == id)
142       return incoming;
143   return NULL;
144 }
145
146 static void
147 destroy_incoming (struct Incoming *incoming)
148 {
149   if (NULL != incoming->mq)
150   {
151     GNUNET_MQ_destroy (incoming->mq);
152     incoming->mq = NULL;
153   }
154   if (NULL != incoming->socket)
155   {
156     GNUNET_STREAM_close (incoming->socket);
157     incoming->socket = NULL;
158   }
159   GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
160   GNUNET_free (incoming);
161 }
162
163
164 /**
165  * Handle a request for a set operation for
166  * another peer.
167  *
168  * @param cls the incoming socket
169  * @param mh the message
170  */
171 static void
172 handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
173 {
174   struct Incoming *incoming = cls;
175   const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
176   struct GNUNET_MQ_Message *mqm;
177   struct RequestMessage *cmsg;
178   struct Listener *listener;
179   const struct GNUNET_MessageHeader *context_msg;
180
181   if (ntohs (mh->size) < sizeof *msg)
182   {
183     GNUNET_break (0);
184     destroy_incoming (incoming);
185     return;
186   }
187   else if (ntohs (mh->size) == sizeof *msg)
188   {
189     context_msg = NULL;
190   }
191   else
192   {
193     context_msg = &msg[1].header;
194     if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size))
195     {
196       /* size of context message is invalid */
197       GNUNET_break (0);
198       destroy_incoming (incoming);
199       return;
200     }
201   }
202
203   for (listener = listeners_head; listener != NULL; listener = listener->next)
204   {
205     if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
206          (htons (msg->operation) != listener->operation) )
207       continue;
208     mqm = GNUNET_MQ_msg_concat (cmsg, context_msg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
209     incoming->request_id = request_id++;
210     cmsg->request_id = htonl (incoming->request_id);
211     GNUNET_MQ_send (listener->client_mq, mqm);
212     return;
213   }
214 }
215
216
217 /**
218  * Called when a client wants to create a new set.
219  *
220  * @param cls unused
221  * @param client client that sent the message
222  * @param m message sent by the client
223  */
224 static void
225 handle_client_create (void *cls,
226                       struct GNUNET_SERVER_Client *client,
227                       const struct GNUNET_MessageHeader *m)
228 {
229   struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
230   struct Set *set;
231
232   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n");
233
234   if (NULL != get_set (client))
235   {
236     GNUNET_break (0);
237     GNUNET_SERVER_client_disconnect (client);
238     return;
239   }
240
241   set = GNUNET_new (struct Set);
242
243   switch (ntohs (msg->operation))
244   {
245     case GNUNET_SET_OPERATION_INTERSECTION:
246       /* FIXME: cfuchs */
247       GNUNET_assert (0);
248       break;
249     case GNUNET_SET_OPERATION_UNION:
250       set = union_set_create ();
251       break;
252     default:
253       GNUNET_free (set);
254       GNUNET_break (0);
255       GNUNET_SERVER_client_disconnect (client);
256       return;
257   }
258
259   set->client = client;
260   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
261   GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
262
263   GNUNET_SERVER_receive_done (client, GNUNET_OK);
264 }
265
266
267 /**
268  * Called when a client wants to create a new set.
269  *
270  * @param cls unused
271  * @param client client that sent the message
272  * @param m message sent by the client
273  */
274 static void
275 handle_client_listen (void *cls,
276                       struct GNUNET_SERVER_Client *client,
277                       const struct GNUNET_MessageHeader *m)
278 {
279   struct ListenMessage *msg = (struct ListenMessage *) m;
280   struct Listener *listener;
281   
282   if (NULL != get_listener (client))
283   {
284     GNUNET_break (0);
285     GNUNET_SERVER_client_disconnect (client);
286     return;
287   }
288   
289   listener = GNUNET_new (struct Listener);
290   listener->app_id = msg->app_id;
291   listener->operation = msg->operation;
292   GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
293 }
294
295
296 /**
297  * Called when a client wants to create a new set.
298  *
299  * @param cls unused
300  * @param client client that sent the message
301  * @param m message sent by the client
302  */
303 static void
304 handle_client_add (void *cls,
305                       struct GNUNET_SERVER_Client *client,
306                       const struct GNUNET_MessageHeader *m)
307 {
308   struct Set *set;
309
310   set = get_set (client);
311   if (NULL == set)
312   {
313     GNUNET_break (0);
314     client_disconnect (client);
315     return;
316   }
317   switch (set->operation)
318   {
319     case GNUNET_SET_OPERATION_UNION:
320       union_add (set, (struct ElementMessage *) m);
321       break;
322     case GNUNET_SET_OPERATION_INTERSECTION:
323       /* FIXME: cfuchs */
324       break;
325     default:
326       GNUNET_assert (0);
327       break;
328   }
329 }
330
331
332 /**
333  * Called when a client wants to evaluate a set operation with another peer.
334  *
335  * @param cls unused
336  * @param client client that sent the message
337  * @param m message sent by the client
338  */
339 static void
340 handle_client_evaluate (void *cls,
341                         struct GNUNET_SERVER_Client *client,
342                         const struct GNUNET_MessageHeader *m)
343 {
344   struct Set *set;
345   struct EvaluateMessage *msg = (struct EvaluateMessage *) m;
346   struct EvaluateOperation *eo;
347
348   set = get_set (client);
349
350   if (NULL == set)
351   {
352     GNUNET_break (0);
353     client_disconnect (client);
354     return;
355   }
356
357   eo = GNUNET_new (struct EvaluateOperation);
358   eo->peer = msg->peer;
359   eo->app_id = msg->app_id;
360   eo->request_id = msg->request_id;
361   eo->context_msg = GNUNET_copy_message (&msg[1].header);
362   eo->set = set;
363
364   switch (set->operation)
365   {
366     case GNUNET_SET_OPERATION_INTERSECTION:
367       /* FIXME: cfuchs */
368       break;
369     case GNUNET_SET_OPERATION_UNION:
370       union_evaluate (eo);
371       break;
372     default:
373       GNUNET_assert (0);
374       break;
375   }
376 }
377
378
379 /**
380  * Handle a cancel request from a client.
381  *
382  * @param cls unused
383  * @param client the client
384  * @param m the cancel message
385  */
386 static void
387 handle_client_cancel (void *cls,
388                       struct GNUNET_SERVER_Client *client,
389                       const struct GNUNET_MessageHeader *m)
390 {
391   /* FIXME: implement */
392 }
393
394
395 /**
396  * Handle an ack from a client.
397  *
398  * @param cls unused
399  * @param client the client
400  * @param m the message
401  */
402 static void
403 handle_client_ack (void *cls,
404                    struct GNUNET_SERVER_Client *client,
405                    const struct GNUNET_MessageHeader *m)
406 {
407   /* FIXME: implement */
408 }
409
410
411 /**
412  * Handle a request from the client to accept
413  * a set operation.
414  *
415  * @param cls unused
416  * @param client the client
417  * @param m the message
418  */
419 static void
420 handle_client_accept (void *cls,
421                       struct GNUNET_SERVER_Client *client,
422                       const struct GNUNET_MessageHeader *m)
423 {
424   struct AcceptMessage *msg = (struct AcceptMessage *) m;
425   struct Set *set;
426   struct Incoming *incoming;
427   struct EvaluateOperation *eo;
428
429   set = get_set (client);
430
431   if (NULL == set)
432   {
433     GNUNET_break (0);
434     client_disconnect (client);
435     return;
436   }
437
438   incoming = get_incoming (ntohl (msg->request_id));
439
440   if ( (NULL == incoming) ||
441        (incoming->operation != set->operation) )
442   {
443     GNUNET_break (0);
444     client_disconnect (client);
445     return;
446   }
447
448   eo = GNUNET_new (struct EvaluateOperation);
449   eo->peer = incoming->peer;
450   eo->app_id = incoming->app_id;
451   eo->request_id = msg->request_id;
452   eo->set = set;
453
454   switch (set->operation)
455   {
456     case GNUNET_SET_OPERATION_INTERSECTION:
457       /* FIXME: cfuchs*/
458       GNUNET_assert (0);
459       break;
460     case GNUNET_SET_OPERATION_UNION:
461       union_accept (eo, incoming);
462       break;
463     default:
464       GNUNET_assert (0);
465       break;
466   }
467 }
468
469
470 /**
471  * Functions of this type are called upon new stream connection from other peers
472  * or upon binding error which happen when the app_port given in
473  * GNUNET_STREAM_listen() is already taken.
474  *
475  * @param cls the closure from GNUNET_STREAM_listen
476  * @param socket the socket representing the stream; NULL on binding error
477  * @param initiator the identity of the peer who wants to establish a stream
478  *            with us; NULL on binding error
479  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
480  *             stream (the socket will be invalid after the call)
481  */
482 static int
483 stream_listen_cb (void *cls,
484                   struct GNUNET_STREAM_Socket *socket,
485                   const struct GNUNET_PeerIdentity *initiator)
486 {
487   struct Incoming *incoming;
488   static const struct GNUNET_MQ_Handler handlers[] = {
489     {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
490     GNUNET_MQ_HANDLERS_END
491   };
492
493   if (NULL == socket)
494   {
495     GNUNET_break (0);
496     return GNUNET_SYSERR;
497   }
498
499   incoming = GNUNET_new (struct Incoming);
500   incoming->peer = *initiator;
501   incoming->socket = socket;
502   incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket, handlers, incoming);
503   /* FIXME: timeout for peers that only connect but don't send anything */
504   GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
505   return GNUNET_OK;
506 }
507
508
509 /**
510  * Called to clean up, after a shutdown has been requested.
511  *
512  * @param cls closure
513  * @param tc context information (why was this task triggered now)
514  */
515 static void
516 shutdown_task (void *cls,
517                const struct GNUNET_SCHEDULER_TaskContext *tc)
518 {
519   if (NULL != stream_listen_socket)
520   {
521     GNUNET_STREAM_listen_close (stream_listen_socket);
522     stream_listen_socket = NULL;
523   } 
524
525   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
526 }
527
528
529 /**
530  * Function called by the service's run
531  * method to run service-specific setup code.
532  *
533  * @param cls closure
534  * @param server the initialized server
535  * @param cfg configuration to use
536  */
537 static void
538 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg)
539 {
540   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
541     {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
542     {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
543     {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
544     {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
545     {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
546     {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
547     {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
548     {NULL, NULL, 0, 0}
549   };
550
551   configuration = cfg;
552   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
553   GNUNET_SERVER_add_handlers (server, server_handlers);
554   stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
555                                                &stream_listen_cb, NULL,
556                                                GNUNET_STREAM_OPTION_END);
557 }
558
559
560 /**
561  * The main function for the set service.
562  *
563  * @param argc number of arguments from the command line
564  * @param argv command line arguments
565  * @return 0 ok, 1 on error
566  */
567 int
568 main (int argc, char *const *argv)
569 {
570   int ret;
571   ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
572   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
573   return (GNUNET_OK == ret) ? 0 : 1;
574 }
575
576