glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_server.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file fs/gnunet-service-fs_cadet_server.c
18  * @brief non-anonymous file-transfer
19  * @author Christian Grothoff
20  *
21  * TODO:
22  * - PORT is set to old application type, unsure if we should keep
23  *   it that way (fine for now)
24  */
25 #include "platform.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet-service-fs.h"
32 #include "gnunet-service-fs_indexing.h"
33 #include "gnunet-service-fs_cadet.h"
34
35 /**
36  * After how long do we termiante idle connections?
37  */
38 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
39
40
41 /**
42  * A message in the queue to be written to the cadet.
43  */
44 struct WriteQueueItem
45 {
46   /**
47    * Kept in a DLL.
48    */
49   struct WriteQueueItem *next;
50
51   /**
52    * Kept in a DLL.
53    */
54   struct WriteQueueItem *prev;
55
56   /**
57    * Number of bytes of payload, allocated at the end of this struct.
58    */
59   size_t msize;
60 };
61
62
63 /**
64  * Information we keep around for each active cadeting client.
65  */
66 struct CadetClient
67 {
68   /**
69    * DLL
70    */
71   struct CadetClient *next;
72
73   /**
74    * DLL
75    */
76   struct CadetClient *prev;
77
78   /**
79    * Channel for communication.
80    */
81   struct GNUNET_CADET_Channel *channel;
82
83   /**
84    * Head of write queue.
85    */
86   struct WriteQueueItem *wqi_head;
87
88   /**
89    * Tail of write queue.
90    */
91   struct WriteQueueItem *wqi_tail;
92
93   /**
94    * Current active request to the datastore, if we have one pending.
95    */
96   struct GNUNET_DATASTORE_QueueEntry *qe;
97
98   /**
99    * Task that is scheduled to asynchronously terminate the connection.
100    */
101   struct GNUNET_SCHEDULER_Task * terminate_task;
102
103   /**
104    * Task that is scheduled to terminate idle connections.
105    */
106   struct GNUNET_SCHEDULER_Task * timeout_task;
107
108   /**
109    * Size of the last write that was initiated.
110    */
111   size_t reply_size;
112
113 };
114
115
116 /**
117  * Listen port for incoming requests.
118  */
119 static struct GNUNET_CADET_Port *cadet_port;
120
121 /**
122  * Head of DLL of cadet clients.
123  */
124 static struct CadetClient *sc_head;
125
126 /**
127  * Tail of DLL of cadet clients.
128  */
129 static struct CadetClient *sc_tail;
130
131 /**
132  * Number of active cadet clients in the 'sc_*'-DLL.
133  */
134 static unsigned int sc_count;
135
136 /**
137  * Maximum allowed number of cadet clients.
138  */
139 static unsigned long long sc_count_max;
140
141
142
143 /**
144  * Task run to asynchronously terminate the cadet due to timeout.
145  *
146  * @param cls the 'struct CadetClient'
147  */
148 static void
149 timeout_cadet_task (void *cls)
150 {
151   struct CadetClient *sc = cls;
152   struct GNUNET_CADET_Channel *tun;
153
154   sc->timeout_task = NULL;
155   tun = sc->channel;
156   sc->channel = NULL;
157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
158               "Timeout for inactive cadet client %p\n",
159               sc);
160   GNUNET_CADET_channel_destroy (tun);
161 }
162
163
164 /**
165  * Reset the timeout for the cadet client (due to activity).
166  *
167  * @param sc client handle to reset timeout for
168  */
169 static void
170 refresh_timeout_task (struct CadetClient *sc)
171 {
172   if (NULL != sc->timeout_task)
173     GNUNET_SCHEDULER_cancel (sc->timeout_task);
174   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
175                                                    &timeout_cadet_task,
176                                                    sc);
177 }
178
179
180 /**
181  * Check if we are done with the write queue, and if so tell CADET
182  * that we are ready to read more.
183  *
184  * @param cls where to process the write queue
185  */
186 static void
187 continue_writing (void *cls)
188 {
189   struct CadetClient *sc = cls;
190   struct GNUNET_MQ_Handle *mq;
191
192   mq = GNUNET_CADET_get_mq (sc->channel);
193   if (0 != GNUNET_MQ_get_length (mq))
194   {
195     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
196                 "Write pending, waiting for it to complete\n");
197     return;
198   }
199   refresh_timeout_task (sc);
200   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
201               "Finished processing cadet request from client %p, ready to receive the next one\n",
202               sc);
203   GNUNET_CADET_receive_done (sc->channel);
204 }
205
206
207 /**
208  * Process a datum that was stored in the datastore.
209  *
210  * @param cls closure with the `struct CadetClient` which sent the query
211  * @param key key for the content
212  * @param size number of bytes in @a data
213  * @param data content stored
214  * @param type type of the content
215  * @param priority priority of the content
216  * @param anonymity anonymity-level for the content
217  * @param replication replication-level for the content
218  * @param expiration expiration time for the content
219  * @param uid unique identifier for the datum;
220  *        maybe 0 if no unique identifier is available
221  */
222 static void
223 handle_datastore_reply (void *cls,
224                         const struct GNUNET_HashCode *key,
225                         size_t size,
226                         const void *data,
227                         enum GNUNET_BLOCK_Type type,
228                         uint32_t priority,
229                         uint32_t anonymity,
230                         uint32_t replication,
231                         struct GNUNET_TIME_Absolute expiration,
232                         uint64_t uid)
233 {
234   struct CadetClient *sc = cls;
235   size_t msize = size + sizeof (struct CadetReplyMessage);
236   struct GNUNET_MQ_Envelope *env;
237   struct CadetReplyMessage *srm;
238
239   sc->qe = NULL;
240   if (NULL == data)
241   {
242     /* no result, this should not really happen, as for
243        non-anonymous routing only peers that HAVE the
244        answers should be queried; OTOH, this is not a
245        hard error as we might have had the answer in the
246        past and the user might have unindexed it. Hence
247        we log at level "INFO" for now. */
248     if (NULL == key)
249     {
250       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
251                   "Have no answer and the query was NULL\n");
252     }
253     else
254     {
255       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
256                   "Have no answer for query `%s'\n",
257                   GNUNET_h2s (key));
258     }
259     GNUNET_STATISTICS_update (GSF_stats,
260                               gettext_noop ("# queries received via CADET not answered"),
261                               1,
262                               GNUNET_NO);
263     continue_writing (sc);
264     return;
265   }
266   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
267   {
268     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
269                 "Performing on-demand encoding for query %s\n",
270                 GNUNET_h2s (key));
271     if (GNUNET_OK !=
272         GNUNET_FS_handle_on_demand_block (key,
273                                     size,
274                                     data,
275                                     type,
276                                     priority,
277                                     anonymity,
278                                     replication,
279                                     expiration,
280                                     uid,
281                                     &handle_datastore_reply,
282                                     sc))
283     {
284       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285                   "On-demand encoding request failed\n");
286       continue_writing (sc);
287     }
288     return;
289   }
290   if (msize > GNUNET_MAX_MESSAGE_SIZE)
291   {
292     GNUNET_break (0);
293     continue_writing (sc);
294     return;
295   }
296   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298               "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
299               (unsigned int) size,
300               (unsigned int) type,
301               GNUNET_h2s (key),
302               sc);
303   env = GNUNET_MQ_msg_extra (srm,
304                              size,
305                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
306   srm->type = htonl (type);
307   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
308   GNUNET_memcpy (&srm[1],
309                  data,
310                  size);
311   GNUNET_MQ_notify_sent (env,
312                          &continue_writing,
313                          sc);
314   GNUNET_STATISTICS_update (GSF_stats,
315                             gettext_noop ("# Blocks transferred via cadet"),
316                             1,
317                             GNUNET_NO);
318   GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
319                   env);
320 }
321
322
323 /**
324  * Functions with this signature are called whenever a
325  * complete query message is received.
326  *
327  * @param cls closure with the `struct CadetClient`
328  * @param sqm the actual message
329  */
330 static void
331 handle_request (void *cls,
332                 const struct CadetQueryMessage *sqm)
333 {
334   struct CadetClient *sc = cls;
335
336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337               "Received query for `%s' via cadet from client %p\n",
338               GNUNET_h2s (&sqm->query),
339               sc);
340   GNUNET_STATISTICS_update (GSF_stats,
341                             gettext_noop ("# queries received via cadet"),
342                             1,
343                             GNUNET_NO);
344   refresh_timeout_task (sc);
345   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
346                                      0 /* next_uid */,
347                                      false /* random */,
348                                      &sqm->query,
349                                      ntohl (sqm->type),
350                                      0 /* priority */,
351                                      GSF_datastore_queue_size,
352                                      &handle_datastore_reply,
353                                      sc);
354   if (NULL == sc->qe)
355   {
356     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357                 "Queueing request with datastore failed (queue full?)\n");
358     continue_writing (sc);
359   }
360 }
361
362
363 /**
364  * Functions of this type are called upon new cadet connection from other peers.
365  *
366  * @param cls the closure from GNUNET_CADET_connect
367  * @param channel the channel representing the cadet
368  * @param initiator the identity of the peer who wants to establish a cadet
369  *            with us; NULL on binding error
370  * @return initial channel context (our `struct CadetClient`)
371  */
372 static void *
373 connect_cb (void *cls,
374             struct GNUNET_CADET_Channel *channel,
375             const struct GNUNET_PeerIdentity *initiator)
376 {
377   struct CadetClient *sc;
378
379   GNUNET_assert (NULL != channel);
380   if (sc_count >= sc_count_max)
381   {
382     GNUNET_STATISTICS_update (GSF_stats,
383                               gettext_noop ("# cadet client connections rejected"),
384                               1,
385                               GNUNET_NO);
386     GNUNET_CADET_channel_destroy (channel);
387     return NULL;
388   }
389   GNUNET_STATISTICS_update (GSF_stats,
390                             gettext_noop ("# cadet connections active"),
391                             1,
392                             GNUNET_NO);
393   sc = GNUNET_new (struct CadetClient);
394   sc->channel = channel;
395   GNUNET_CONTAINER_DLL_insert (sc_head,
396                                sc_tail,
397                                sc);
398   sc_count++;
399   refresh_timeout_task (sc);
400   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401               "Accepting inbound cadet connection from `%s' as client %p\n",
402               GNUNET_i2s (initiator),
403               sc);
404   return sc;
405 }
406
407
408 /**
409  * Function called by cadet when a client disconnects.
410  * Cleans up our `struct CadetClient` of that channel.
411  *
412  * @param cls  our `struct CadetClient`
413  * @param channel channel of the disconnecting client
414  * @param channel_ctx
415  */
416 static void
417 disconnect_cb (void *cls,
418                const struct GNUNET_CADET_Channel *channel)
419 {
420   struct CadetClient *sc = cls;
421   struct WriteQueueItem *wqi;
422
423   if (NULL == sc)
424     return;
425   sc->channel = NULL;
426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427               "Terminating cadet connection with client %p\n",
428               sc);
429   GNUNET_STATISTICS_update (GSF_stats,
430                             gettext_noop ("# cadet connections active"), -1,
431                             GNUNET_NO);
432   if (NULL != sc->terminate_task)
433     GNUNET_SCHEDULER_cancel (sc->terminate_task);
434   if (NULL != sc->timeout_task)
435     GNUNET_SCHEDULER_cancel (sc->timeout_task);
436   if (NULL != sc->qe)
437     GNUNET_DATASTORE_cancel (sc->qe);
438   while (NULL != (wqi = sc->wqi_head))
439   {
440     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
441                                  sc->wqi_tail,
442                                  wqi);
443     GNUNET_free (wqi);
444   }
445   GNUNET_CONTAINER_DLL_remove (sc_head,
446                                sc_tail,
447                                sc);
448   sc_count--;
449   GNUNET_free (sc);
450 }
451
452
453 /**
454  * Function called whenever an MQ-channel's transmission window size changes.
455  *
456  * The first callback in an outgoing channel will be with a non-zero value
457  * and will mean the channel is connected to the destination.
458  *
459  * For an incoming channel it will be called immediately after the
460  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
461  *
462  * @param cls Channel closure.
463  * @param channel Connection to the other end (henceforth invalid).
464  * @param window_size New window size. If the is more messages than buffer size
465  *                    this value will be negative..
466  */
467 static void
468 window_change_cb (void *cls,
469                   const struct GNUNET_CADET_Channel *channel,
470                   int window_size)
471 {
472   /* FIXME: could do flow control here... */
473 }
474
475
476 /**
477  * Initialize subsystem for non-anonymous file-sharing.
478  */
479 void
480 GSF_cadet_start_server ()
481 {
482   struct GNUNET_MQ_MessageHandler handlers[] = {
483     GNUNET_MQ_hd_fixed_size (request,
484                              GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
485                              struct CadetQueryMessage,
486                              NULL),
487     GNUNET_MQ_handler_end ()
488   };
489   struct GNUNET_HashCode port;
490
491   if (GNUNET_YES !=
492       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
493                                              "fs",
494                                              "MAX_CADET_CLIENTS",
495                                              &sc_count_max))
496     return;
497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498               "Initializing cadet FS server with a limit of %llu connections\n",
499               sc_count_max);
500   cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
501   cadet_handle = GNUNET_CADET_connect (GSF_cfg);
502   GNUNET_assert (NULL != cadet_handle);
503   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
504                       strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
505                       &port);
506   cadet_port = GNUNET_CADET_open_port (cadet_handle,
507                                        &port,
508                                        &connect_cb,
509                                        NULL,
510                                        &window_change_cb,
511                                        &disconnect_cb,
512                                        handlers);
513 }
514
515
516 /**
517  * Shutdown subsystem for non-anonymous file-sharing.
518  */
519 void
520 GSF_cadet_stop_server ()
521 {
522   GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
523                                          &GSF_cadet_release_clients,
524                                          NULL);
525   GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
526   cadet_map = NULL;
527   if (NULL != cadet_port)
528   {
529     GNUNET_CADET_close_port (cadet_port);
530     cadet_port = NULL;
531   }
532   if (NULL != cadet_handle)
533   {
534     GNUNET_CADET_disconnect (cadet_handle);
535     cadet_handle = NULL;
536   }
537   GNUNET_assert (NULL == sc_head);
538   GNUNET_assert (0 == sc_count);
539 }
540
541 /* end of gnunet-service-fs_cadet.c */