-missing file, more cleanup
[oweals/gnunet.git] / src / fs / gnunet-service-fs_mesh_server.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_mesh.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_mesh_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_mesh.h"
39
40 /**
41  * After how long do we termiante idle connections?
42  */
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
44
45
46 /**
47  * A message in the queue to be written to the mesh.
48  */
49 struct WriteQueueItem
50 {
51   /**
52    * Kept in a DLL.
53    */
54   struct WriteQueueItem *next;
55
56   /**
57    * Kept in a DLL.
58    */
59   struct WriteQueueItem *prev;
60
61   /**
62    * Number of bytes of payload, allocated at the end of this struct.
63    */
64   size_t msize;
65 };
66
67
68 /**
69  * Information we keep around for each active meshing client.
70  */
71 struct MeshClient
72 {
73   /**
74    * DLL
75    */ 
76   struct MeshClient *next;
77
78   /**
79    * DLL
80    */ 
81   struct MeshClient *prev;
82
83   /**
84    * Tunnel for communication.
85    */ 
86   struct GNUNET_MESH_Tunnel *tunnel;
87
88   /**
89    * Handle for active write operation, or NULL.
90    */ 
91   struct GNUNET_MESH_TransmitHandle *wh;
92
93   /**
94    * Head of write queue.
95    */
96   struct WriteQueueItem *wqi_head;
97
98   /**
99    * Tail of write queue.
100    */
101   struct WriteQueueItem *wqi_tail;
102   
103   /**
104    * Current active request to the datastore, if we have one pending.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *qe;
107
108   /**
109    * Task that is scheduled to asynchronously terminate the connection.
110    */
111   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
112
113   /**
114    * Task that is scheduled to terminate idle connections.
115    */
116   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
117
118   /**
119    * Size of the last write that was initiated.
120    */ 
121   size_t reply_size;
122
123 };
124
125
126 /**
127  * Listen tunnel for incoming requests.
128  */
129 static struct GNUNET_MESH_Handle *listen_tunnel;
130
131 /**
132  * Head of DLL of mesh clients.
133  */ 
134 static struct MeshClient *sc_head;
135
136 /**
137  * Tail of DLL of mesh clients.
138  */ 
139 static struct MeshClient *sc_tail;
140
141 /**
142  * Number of active mesh clients in the 'sc_*'-DLL.
143  */
144 static unsigned int sc_count;
145
146 /**
147  * Maximum allowed number of mesh clients.
148  */
149 static unsigned long long sc_count_max;
150
151
152
153 /**
154  * Task run to asynchronously terminate the mesh due to timeout.
155  *
156  * @param cls the 'struct MeshClient'
157  * @param tc scheduler context
158  */ 
159 static void
160 timeout_mesh_task (void *cls,
161                      const struct GNUNET_SCHEDULER_TaskContext *tc)
162 {
163   struct MeshClient *sc = cls;
164   struct GNUNET_MESH_Tunnel *tun;
165
166   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
167   tun = sc->tunnel;
168   sc->tunnel = NULL;
169   GNUNET_MESH_tunnel_destroy (tun);
170 }
171
172
173 /**
174  * Reset the timeout for the mesh client (due to activity).
175  *
176  * @param sc client handle to reset timeout for
177  */
178 static void
179 refresh_timeout_task (struct MeshClient *sc)
180 {
181   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
182     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
183   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
184                                                    &timeout_mesh_task,
185                                                    sc);
186 }
187
188
189 /**
190  * We're done handling a request from a client, read the next one.
191  *
192  * @param sc client to continue reading requests from
193  */
194 static void
195 continue_reading (struct MeshClient *sc)
196 {
197   refresh_timeout_task (sc);
198   GNUNET_MESH_receive_done (sc->tunnel);
199 }
200
201
202 /**
203  * Transmit the next entry from the write queue.
204  *
205  * @param sc where to process the write queue
206  */
207 static void
208 continue_writing (struct MeshClient *sc);
209
210
211 /**
212  * Send a reply now, mesh is ready.
213  *
214  * @param cls closure with the struct MeshClient which sent the query
215  * @param size number of bytes available in 'buf'
216  * @param buf where to write the message
217  * @return number of bytes written to 'buf'
218  */
219 static size_t
220 write_continuation (void *cls,
221                     size_t size,
222                     void *buf)
223 {
224   struct MeshClient *sc = cls;
225   struct GNUNET_MESH_Tunnel *tun;
226   struct WriteQueueItem *wqi;
227   size_t ret;
228
229   sc->wh = NULL;
230   if (NULL == (wqi = sc->wqi_head))
231   {
232     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
233                 "Write queue empty, reading more requests\n");
234     return 0;
235   }
236   if (0 == size)
237   {
238     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
239                 "Transmission of reply failed, terminating mesh\n");
240     tun = sc->tunnel;
241     sc->tunnel = NULL;
242     GNUNET_MESH_tunnel_destroy (tun);
243     return 0;
244   }
245   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
246                                sc->wqi_tail,
247                                wqi);
248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249               "Transmitted %u byte reply via mesh\n",
250               (unsigned int) size);
251   GNUNET_STATISTICS_update (GSF_stats,
252                             gettext_noop ("# Blocks transferred via mesh"), 1,
253                             GNUNET_NO);
254   memcpy (buf, &wqi[1], ret = wqi->msize);
255   GNUNET_free (wqi);
256   continue_writing (sc);
257   return ret;
258 }
259
260
261 /**
262  * Transmit the next entry from the write queue.
263  *
264  * @param sc where to process the write queue
265  */
266 static void
267 continue_writing (struct MeshClient *sc)
268 {
269   struct WriteQueueItem *wqi;
270   struct GNUNET_MESH_Tunnel *tun;
271
272   if (NULL != sc->wh)
273   {
274     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
275                 "Write pending, waiting for it to complete\n");
276     return; /* write already pending */
277   }
278   if (NULL == (wqi = sc->wqi_head))
279   {
280     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281                 "Write queue empty, reading more requests\n");
282     continue_reading (sc);
283     return;
284   }
285   sc->wh = GNUNET_MESH_notify_transmit_ready (sc->tunnel, GNUNET_NO,
286                                               GNUNET_TIME_UNIT_FOREVER_REL,
287                                               wqi->msize,                                     
288                                               &write_continuation,
289                                               sc);
290   if (NULL == sc->wh)
291   {
292     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
293                 "Write failed; terminating mesh\n");
294     tun = sc->tunnel;
295     sc->tunnel = NULL;
296     GNUNET_MESH_tunnel_destroy (tun);
297     return;
298   }
299 }
300
301
302 /**
303  * Process a datum that was stored in the datastore.
304  *
305  * @param cls closure with the struct MeshClient which sent the query
306  * @param key key for the content
307  * @param size number of bytes in data
308  * @param data content stored
309  * @param type type of the content
310  * @param priority priority of the content
311  * @param anonymity anonymity-level for the content
312  * @param expiration expiration time for the content
313  * @param uid unique identifier for the datum;
314  *        maybe 0 if no unique identifier is available
315  */
316 static void 
317 handle_datastore_reply (void *cls,
318                         const struct GNUNET_HashCode *key,
319                         size_t size, const void *data,
320                         enum GNUNET_BLOCK_Type type,
321                         uint32_t priority,
322                         uint32_t anonymity,
323                         struct GNUNET_TIME_Absolute
324                         expiration, uint64_t uid)
325 {
326   struct MeshClient *sc = cls;
327   size_t msize = size + sizeof (struct MeshReplyMessage);
328   struct WriteQueueItem *wqi;
329   struct MeshReplyMessage *srm;
330
331   sc->qe = NULL;
332   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
333   {
334     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335                 "Performing on-demand encoding for query %s\n",
336                 GNUNET_h2s (key));
337     if (GNUNET_OK !=
338         GNUNET_FS_handle_on_demand_block (key,
339                                           size, data, type,
340                                           priority, anonymity,
341                                           expiration, uid,
342                                           &handle_datastore_reply,
343                                           sc))
344     {
345       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346                   "On-demand encoding request failed\n");
347       continue_writing (sc);
348     }
349     return;
350   }
351   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
352   {
353     GNUNET_break (0);
354     continue_writing (sc);
355     return;
356   }
357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
358               "Starting transmission of %u byte reply for query `%s' via mesh\n",
359               (unsigned int) size,
360               GNUNET_h2s (key));
361   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
362   wqi->msize = msize;
363   srm = (struct MeshReplyMessage *) &wqi[1];
364   srm->header.size = htons ((uint16_t) msize);
365   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
366   srm->type = htonl (type);
367   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
368   memcpy (&srm[1], data, size);
369   sc->reply_size = msize;
370   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
371                                sc->wqi_tail,
372                                wqi);
373   continue_writing (sc);
374 }
375
376
377 /**
378  * Functions with this signature are called whenever a
379  * complete query message is received.
380  *
381  * Do not call GNUNET_SERVER_mst_destroy in callback
382  *
383  * @param cls closure with the 'struct MeshClient'
384  * @param tunnel tunnel handle
385  * @param tunnel_ctx tunnel context
386  * @param message the actual message
387  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
388  */
389 static int
390 request_cb (void *cls,
391             struct GNUNET_MESH_Tunnel *tunnel,
392             void **tunnel_ctx,
393             const struct GNUNET_MessageHeader *message)
394 {
395   struct MeshClient *sc = *tunnel_ctx;
396   const struct MeshQueryMessage *sqm;
397
398   sqm = (const struct MeshQueryMessage *) message;
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Received query for `%s' via mesh from client %p\n",
401               GNUNET_h2s (&sqm->query),
402               sc);
403   GNUNET_STATISTICS_update (GSF_stats,
404                             gettext_noop ("# queries received via mesh"), 1,
405                             GNUNET_NO);
406   refresh_timeout_task (sc);
407   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
408                                      0,
409                                      &sqm->query,
410                                      ntohl (sqm->type),
411                                      0 /* priority */, 
412                                      GSF_datastore_queue_size,
413                                      GNUNET_TIME_UNIT_FOREVER_REL,
414                                      &handle_datastore_reply, sc);
415   if (NULL == sc->qe)
416   {
417     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418                 "Queueing request with datastore failed (queue full?)\n");
419     continue_writing (sc);
420   }
421   return GNUNET_OK;
422 }
423
424
425 /**
426  * Functions of this type are called upon new mesh connection from other peers.
427  *
428  * @param cls the closure from GNUNET_MESH_connect
429  * @param tunnel the tunnel representing the mesh
430  * @param initiator the identity of the peer who wants to establish a mesh
431  *            with us; NULL on binding error
432  * @param port mesh port used for the incoming connection
433  * @return initial tunnel context (our 'struct MeshClient')
434  */
435 static void *
436 accept_cb (void *cls,
437            struct GNUNET_MESH_Tunnel *tunnel,
438            const struct GNUNET_PeerIdentity *initiator,
439            uint32_t port)
440 {
441   struct MeshClient *sc;
442
443   GNUNET_assert (NULL != tunnel);
444   if (sc_count >= sc_count_max)
445   {
446     GNUNET_STATISTICS_update (GSF_stats,
447                               gettext_noop ("# mesh client connections rejected"), 1,
448                               GNUNET_NO);
449     GNUNET_MESH_tunnel_destroy (tunnel);
450     return NULL;
451   }
452   GNUNET_STATISTICS_update (GSF_stats,
453                             gettext_noop ("# mesh connections active"), 1,
454                             GNUNET_NO);
455   sc = GNUNET_new (struct MeshClient);
456   sc->tunnel = tunnel;
457   GNUNET_CONTAINER_DLL_insert (sc_head,
458                                sc_tail,
459                                sc);
460   sc_count++;
461   refresh_timeout_task (sc);
462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463               "Accepting inbound mesh connection from `%s' as client %p\n",
464               GNUNET_i2s (initiator),
465               sc);
466   return sc;
467 }
468
469
470 /**
471  * Function called by mesh when a client disconnects.
472  * Cleans up our 'struct MeshClient' of that tunnel.
473  *
474  * @param cls NULL
475  * @param tunnel tunnel of the disconnecting client
476  * @param tunnel_ctx our 'struct MeshClient' 
477  */
478 static void
479 cleaner_cb (void *cls,
480             const struct GNUNET_MESH_Tunnel *tunnel,
481             void *tunnel_ctx)
482 {
483   struct MeshClient *sc = tunnel_ctx;
484   struct WriteQueueItem *wqi;
485
486   if (NULL == sc)
487     return;
488   sc->tunnel = NULL;
489   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490               "Terminating mesh connection with client %p\n",
491               sc);
492   GNUNET_STATISTICS_update (GSF_stats,
493                             gettext_noop ("# mesh connections active"), -1,
494                             GNUNET_NO);
495   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
496     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
497   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
498     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
499   if (NULL != sc->wh)
500     GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
501   if (NULL != sc->qe)
502     GNUNET_DATASTORE_cancel (sc->qe);
503   while (NULL != (wqi = sc->wqi_head))
504   {
505     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
506                                  sc->wqi_tail,
507                                  wqi);
508     GNUNET_free (wqi);
509   }
510   GNUNET_CONTAINER_DLL_remove (sc_head,
511                                sc_tail,
512                                sc);
513   sc_count--;
514   GNUNET_free (sc);
515 }
516
517
518 /**
519  * Initialize subsystem for non-anonymous file-sharing.
520  */
521 void
522 GSF_mesh_start_server ()
523 {
524   static const struct GNUNET_MESH_MessageHandler handlers[] = {
525     { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)},
526     { NULL, 0, 0 }
527   };
528   static const uint32_t ports[] = {
529     GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
530     0
531   };
532
533   if (GNUNET_YES !=
534       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
535                                              "fs",
536                                              "MAX_MESH_CLIENTS",
537                                              &sc_count_max))
538     return;
539   listen_tunnel = GNUNET_MESH_connect (GSF_cfg,
540                                        NULL,
541                                        &accept_cb,
542                                        &cleaner_cb,
543                                        handlers,
544                                        ports);
545 }
546
547
548 /**
549  * Shutdown subsystem for non-anonymous file-sharing.
550  */
551 void
552 GSF_mesh_stop_server ()
553 {
554   if (NULL != listen_tunnel)
555   {
556     GNUNET_MESH_disconnect (listen_tunnel);
557     listen_tunnel = NULL;
558   }
559   GNUNET_assert (NULL == sc_head);
560   GNUNET_assert (0 == sc_count);
561 }
562
563 /* end of gnunet-service-fs_mesh.c */