-splitting mesh into server and client parts
[oweals/gnunet.git] / src / fs / gnunet-service-fs_mesh_server.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_mesh.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - MESH2 API doesn't allow flow control for server yet (needed!)
28  * - likely need to register clean up handler with mesh to handle
29  *   client disconnect (likely leaky right now)
30  * - server is optional, currently client code will NPE if we have
31  *   no server, again MESH2 API requirement forcing this for now
32  * - message handlers are symmetric for client/server, should be
33  *   separated (currently clients can get requests and servers can
34  *   handle answers, not good)
35  * - code is entirely untested
36  * - might have overlooked a few possible simplifications
37  * - PORT is set to old application type, unsure if we should keep
38  *   it that way (fine for now)
39  */
40 #include "platform.h"
41 #include "gnunet_constants.h"
42 #include "gnunet_util_lib.h"
43 #include "gnunet_mesh_service.h"
44 #include "gnunet_protocols.h"
45 #include "gnunet_applications.h"
46 #include "gnunet-service-fs.h"
47 #include "gnunet-service-fs_indexing.h"
48 #include "gnunet-service-fs_mesh.h"
49
50 /**
51  * After how long do we termiante idle connections?
52  */
53 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
54
55
56 /**
57  * A message in the queue to be written to the mesh.
58  */
59 struct WriteQueueItem
60 {
61   /**
62    * Kept in a DLL.
63    */
64   struct WriteQueueItem *next;
65
66   /**
67    * Kept in a DLL.
68    */
69   struct WriteQueueItem *prev;
70
71   /**
72    * Number of bytes of payload, allocated at the end of this struct.
73    */
74   size_t msize;
75 };
76
77
78 /**
79  * Information we keep around for each active meshing client.
80  */
81 struct MeshClient
82 {
83   /**
84    * DLL
85    */ 
86   struct MeshClient *next;
87
88   /**
89    * DLL
90    */ 
91   struct MeshClient *prev;
92
93   /**
94    * Socket for communication.
95    */ 
96   struct GNUNET_MESH_Tunnel *socket;
97
98   /**
99    * Handle for active write operation, or NULL.
100    */ 
101   struct GNUNET_MESH_TransmitHandle *wh;
102
103   /**
104    * Head of write queue.
105    */
106   struct WriteQueueItem *wqi_head;
107
108   /**
109    * Tail of write queue.
110    */
111   struct WriteQueueItem *wqi_tail;
112   
113   /**
114    * Current active request to the datastore, if we have one pending.
115    */
116   struct GNUNET_DATASTORE_QueueEntry *qe;
117
118   /**
119    * Task that is scheduled to asynchronously terminate the connection.
120    */
121   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
122
123   /**
124    * Task that is scheduled to terminate idle connections.
125    */
126   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
127
128   /**
129    * Size of the last write that was initiated.
130    */ 
131   size_t reply_size;
132
133 };
134
135
136 /**
137  * Listen socket for incoming requests.
138  */
139 static struct GNUNET_MESH_Handle *listen_socket;
140
141 /**
142  * Head of DLL of mesh clients.
143  */ 
144 static struct MeshClient *sc_head;
145
146 /**
147  * Tail of DLL of mesh clients.
148  */ 
149 static struct MeshClient *sc_tail;
150
151 /**
152  * Number of active mesh clients in the 'sc_*'-DLL.
153  */
154 static unsigned int sc_count;
155
156 /**
157  * Maximum allowed number of mesh clients.
158  */
159 static unsigned long long sc_count_max;
160
161
162
163 /* ********************* server-side code ************************* */
164
165
166 /**
167  * We're done with a particular client, clean up.
168  *
169  * @param sc client to clean up
170  */
171 static void
172 terminate_mesh (struct MeshClient *sc)
173 {
174   struct WriteQueueItem *wqi;
175
176   fprintf (stderr,
177            "terminate mesh called for %p\n",
178            sc);
179   GNUNET_STATISTICS_update (GSF_stats,
180                             gettext_noop ("# mesh connections active"), -1,
181                             GNUNET_NO);
182   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
183     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
184   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
185     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
186   if (NULL != sc->wh)
187     GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
188   if (NULL != sc->qe)
189     GNUNET_DATASTORE_cancel (sc->qe);
190   while (NULL != (wqi = sc->wqi_head))
191   {
192     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
193                                  sc->wqi_tail,
194                                  wqi);
195     GNUNET_free (wqi);
196   }
197   GNUNET_CONTAINER_DLL_remove (sc_head,
198                                sc_tail,
199                                sc);
200   sc_count--;
201   GNUNET_free (sc);
202 }
203
204
205 /**
206  * Task run to asynchronously terminate the mesh due to timeout.
207  *
208  * @param cls the 'struct MeshClient'
209  * @param tc scheduler context
210  */ 
211 static void
212 timeout_mesh_task (void *cls,
213                      const struct GNUNET_SCHEDULER_TaskContext *tc)
214 {
215   struct MeshClient *sc = cls;
216   struct GNUNET_MESH_Tunnel *tun;
217
218   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
219   tun = sc->socket;
220   sc->socket = NULL;
221   GNUNET_MESH_tunnel_destroy (tun);
222 }
223
224
225 /**
226  * Reset the timeout for the mesh client (due to activity).
227  *
228  * @param sc client handle to reset timeout for
229  */
230 static void
231 refresh_timeout_task (struct MeshClient *sc)
232 {
233   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
234     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
235   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
236                                                    &timeout_mesh_task,
237                                                    sc);
238 }
239
240
241 /**
242  * We're done handling a request from a client, read the next one.
243  *
244  * @param sc client to continue reading requests from
245  */
246 static void
247 continue_reading (struct MeshClient *sc)
248 {
249   refresh_timeout_task (sc);
250   GNUNET_MESH_receive_done (sc->socket);
251 }
252
253
254 /**
255  * Transmit the next entry from the write queue.
256  *
257  * @param sc where to process the write queue
258  */
259 static void
260 continue_writing (struct MeshClient *sc);
261
262
263 /**
264  * Send a reply now, mesh is ready.
265  *
266  * @param cls closure with the struct MeshClient which sent the query
267  * @param size number of bytes available in 'buf'
268  * @param buf where to write the message
269  * @return number of bytes written to 'buf'
270  */
271 static size_t
272 write_continuation (void *cls,
273                     size_t size,
274                     void *buf)
275 {
276   struct MeshClient *sc = cls;
277   struct WriteQueueItem *wqi;
278   size_t ret;
279
280   sc->wh = NULL;
281   if (NULL == (wqi = sc->wqi_head))
282   {
283     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
284                 "Write queue empty, reading more requests\n");
285     return 0;
286   }
287   if (0 == size)
288   {
289     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290                 "Transmission of reply failed, terminating mesh\n");
291     terminate_mesh (sc);    
292     return 0;
293   }
294   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
295                                sc->wqi_tail,
296                                wqi);
297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298               "Transmitted %u byte reply via mesh\n",
299               (unsigned int) size);
300   GNUNET_STATISTICS_update (GSF_stats,
301                             gettext_noop ("# Blocks transferred via mesh"), 1,
302                             GNUNET_NO);
303   memcpy (buf, &wqi[1], ret = wqi->msize);
304   GNUNET_free (wqi);
305   continue_writing (sc);
306   return ret;
307 }
308
309
310 /**
311  * Transmit the next entry from the write queue.
312  *
313  * @param sc where to process the write queue
314  */
315 static void
316 continue_writing (struct MeshClient *sc)
317 {
318   struct WriteQueueItem *wqi;
319
320   if (NULL != sc->wh)
321   {
322     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
323                 "Write pending, waiting for it to complete\n");
324     return; /* write already pending */
325   }
326   if (NULL == (wqi = sc->wqi_head))
327   {
328     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329                 "Write queue empty, reading more requests\n");
330     continue_reading (sc);
331     return;
332   }
333   sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
334                                               GNUNET_TIME_UNIT_FOREVER_REL,
335                                               wqi->msize,                                     
336                                               &write_continuation,
337                                               sc);
338   if (NULL == sc->wh)
339   {
340     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341                 "Write failed; terminating mesh\n");
342     terminate_mesh (sc);
343     return;
344   }
345 }
346
347
348 /**
349  * Process a datum that was stored in the datastore.
350  *
351  * @param cls closure with the struct MeshClient which sent the query
352  * @param key key for the content
353  * @param size number of bytes in data
354  * @param data content stored
355  * @param type type of the content
356  * @param priority priority of the content
357  * @param anonymity anonymity-level for the content
358  * @param expiration expiration time for the content
359  * @param uid unique identifier for the datum;
360  *        maybe 0 if no unique identifier is available
361  */
362 static void 
363 handle_datastore_reply (void *cls,
364                         const struct GNUNET_HashCode * key,
365                         size_t size, const void *data,
366                         enum GNUNET_BLOCK_Type type,
367                         uint32_t priority,
368                         uint32_t anonymity,
369                         struct GNUNET_TIME_Absolute
370                         expiration, uint64_t uid)
371 {
372   struct MeshClient *sc = cls;
373   size_t msize = size + sizeof (struct MeshReplyMessage);
374   struct WriteQueueItem *wqi;
375   struct MeshReplyMessage *srm;
376
377   sc->qe = NULL;
378   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
379   {
380     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381                 "Performing on-demand encoding\n");
382     if (GNUNET_OK !=
383         GNUNET_FS_handle_on_demand_block (key,
384                                           size, data, type,
385                                           priority, anonymity,
386                                           expiration, uid,
387                                           &handle_datastore_reply,
388                                           sc))
389     {
390       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391                   "On-demand encoding request failed\n");
392       continue_writing (sc);
393     }
394     return;
395   }
396   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
397   {
398     GNUNET_break (0);
399     continue_writing (sc);
400     return;
401   }
402   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403               "Starting transmission of %u byte reply for query `%s' via mesh\n",
404               (unsigned int) size,
405               GNUNET_h2s (key));
406   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
407   wqi->msize = msize;
408   srm = (struct MeshReplyMessage *) &wqi[1];
409   srm->header.size = htons ((uint16_t) msize);
410   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
411   srm->type = htonl (type);
412   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
413   memcpy (&srm[1], data, size);
414   sc->reply_size = msize;
415   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
416                                sc->wqi_tail,
417                                wqi);
418   continue_writing (sc);
419 }
420
421
422 /**
423  * Functions with this signature are called whenever a
424  * complete query message is received.
425  *
426  * Do not call GNUNET_SERVER_mst_destroy in callback
427  *
428  * @param cls closure with the 'struct MeshClient'
429  * @param tunnel tunnel handle
430  * @param tunnel_ctx tunnel context
431  * @param message the actual message
432  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
433  */
434 static int
435 request_cb (void *cls,
436             struct GNUNET_MESH_Tunnel *tunnel,
437             void **tunnel_ctx,
438             const struct GNUNET_MessageHeader *message)
439 {
440   struct MeshClient *sc = *tunnel_ctx;
441   const struct MeshQueryMessage *sqm;
442
443   fprintf (stderr,
444            "Request gets %p\n", 
445            sc);
446   sqm = (const struct MeshQueryMessage *) message;
447   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448               "Received query for `%s' via mesh\n",
449               GNUNET_h2s (&sqm->query));
450   GNUNET_STATISTICS_update (GSF_stats,
451                             gettext_noop ("# queries received via mesh"), 1,
452                             GNUNET_NO);
453   refresh_timeout_task (sc);
454   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
455                                      0,
456                                      &sqm->query,
457                                      ntohl (sqm->type),
458                                      0 /* priority */, 
459                                      GSF_datastore_queue_size,
460                                      GNUNET_TIME_UNIT_FOREVER_REL,
461                                      &handle_datastore_reply, sc);
462   if (NULL == sc->qe)
463   {
464     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465                 "Queueing request with datastore failed (queue full?)\n");
466     continue_writing (sc);
467   }
468   return GNUNET_OK;
469 }
470
471
472 /**
473  * Functions of this type are called upon new mesh connection from other peers.
474  *
475  * @param cls the closure from GNUNET_MESH_connect
476  * @param socket the socket representing the mesh
477  * @param initiator the identity of the peer who wants to establish a mesh
478  *            with us; NULL on binding error
479  * @param port mesh port used for the incoming connection
480  * @return initial tunnel context (our 'struct MeshClient')
481  */
482 static void *
483 accept_cb (void *cls,
484            struct GNUNET_MESH_Tunnel *socket,
485            const struct GNUNET_PeerIdentity *initiator,
486            uint32_t port)
487 {
488   struct MeshClient *sc;
489
490   GNUNET_assert (NULL != socket);
491   if (sc_count >= sc_count_max)
492   {
493     GNUNET_STATISTICS_update (GSF_stats,
494                               gettext_noop ("# mesh client connections rejected"), 1,
495                               GNUNET_NO);
496     GNUNET_MESH_tunnel_destroy (socket);
497     return NULL;
498   }
499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500               "Accepting inbound mesh connection from `%s'\n",
501               GNUNET_i2s (initiator));
502   GNUNET_STATISTICS_update (GSF_stats,
503                             gettext_noop ("# mesh connections active"), 1,
504                             GNUNET_NO);
505   sc = GNUNET_new (struct MeshClient);
506   sc->socket = socket;
507   GNUNET_CONTAINER_DLL_insert (sc_head,
508                                sc_tail,
509                                sc);
510   sc_count++;
511   refresh_timeout_task (sc);
512   fprintf (stderr,
513            "Accept returns %p\n", 
514            sc);
515   return sc;
516 }
517
518
519 /**
520  * Function called by mesh when a client disconnects.
521  * Cleans up our 'struct MeshClient' of that tunnel.
522  *
523  * @param cls NULL
524  * @param tunnel tunnel of the disconnecting client
525  * @param tunnel_ctx our 'struct MeshClient' 
526  */
527 static void
528 cleaner_cb (void *cls,
529             const struct GNUNET_MESH_Tunnel *tunnel,
530             void *tunnel_ctx)
531 {
532   struct MeshClient *sc = tunnel_ctx;
533
534   fprintf (stderr,
535            "Cleaner called with %p\n", 
536            sc);
537   if (NULL != sc)
538     terminate_mesh (sc);
539 }
540
541
542 /**
543  * Initialize subsystem for non-anonymous file-sharing.
544  */
545 void
546 GSF_mesh_start_server ()
547 {
548   static const struct GNUNET_MESH_MessageHandler handlers[] = {
549     { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)},
550     { NULL, 0, 0 }
551   };
552   static const uint32_t ports[] = {
553     GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
554     0
555   };
556
557   if (GNUNET_YES !=
558       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
559                                              "fs",
560                                              "MAX_MESH_CLIENTS",
561                                              &sc_count_max))
562     return;
563   listen_socket = GNUNET_MESH_connect (GSF_cfg,
564                                        NULL,
565                                        &accept_cb,
566                                        &cleaner_cb,
567                                        handlers,
568                                        ports);
569 }
570
571
572 /**
573  * Shutdown subsystem for non-anonymous file-sharing.
574  */
575 void
576 GSF_mesh_stop_server ()
577 {
578   struct MeshClient *sc;
579
580   while (NULL != (sc = sc_head))
581     terminate_mesh (sc);
582   if (NULL != listen_socket)
583   {
584     GNUNET_MESH_disconnect (listen_socket);
585     listen_socket = NULL;
586   }
587 }
588
589 /* end of gnunet-service-fs_mesh.c */