15faaeef0e65fb6a0b802b1ede8d586298ce842f
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_server.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file fs/gnunet-service-fs_cadet_server.c
21  * @brief non-anonymous file-transfer
22  * @author Christian Grothoff
23  *
24  * TODO:
25  * - PORT is set to old application type, unsure if we should keep
26  *   it that way (fine for now)
27  */
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_applications.h"
34 #include "gnunet-service-fs.h"
35 #include "gnunet-service-fs_indexing.h"
36 #include "gnunet-service-fs_cadet.h"
37
38 /**
39  * After how long do we termiante idle connections?
40  */
41 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
42
43
44 /**
45  * A message in the queue to be written to the cadet.
46  */
47 struct WriteQueueItem
48 {
49   /**
50    * Kept in a DLL.
51    */
52   struct WriteQueueItem *next;
53
54   /**
55    * Kept in a DLL.
56    */
57   struct WriteQueueItem *prev;
58
59   /**
60    * Number of bytes of payload, allocated at the end of this struct.
61    */
62   size_t msize;
63 };
64
65
66 /**
67  * Information we keep around for each active cadeting client.
68  */
69 struct CadetClient
70 {
71   /**
72    * DLL
73    */
74   struct CadetClient *next;
75
76   /**
77    * DLL
78    */
79   struct CadetClient *prev;
80
81   /**
82    * Channel for communication.
83    */
84   struct GNUNET_CADET_Channel *channel;
85
86   /**
87    * Head of write queue.
88    */
89   struct WriteQueueItem *wqi_head;
90
91   /**
92    * Tail of write queue.
93    */
94   struct WriteQueueItem *wqi_tail;
95
96   /**
97    * Current active request to the datastore, if we have one pending.
98    */
99   struct GNUNET_DATASTORE_QueueEntry *qe;
100
101   /**
102    * Task that is scheduled to asynchronously terminate the connection.
103    */
104   struct GNUNET_SCHEDULER_Task * terminate_task;
105
106   /**
107    * Task that is scheduled to terminate idle connections.
108    */
109   struct GNUNET_SCHEDULER_Task * timeout_task;
110
111   /**
112    * Size of the last write that was initiated.
113    */
114   size_t reply_size;
115
116 };
117
118
119 /**
120  * Listen port for incoming requests.
121  */
122 static struct GNUNET_CADET_Port *cadet_port;
123
124 /**
125  * Head of DLL of cadet clients.
126  */
127 static struct CadetClient *sc_head;
128
129 /**
130  * Tail of DLL of cadet clients.
131  */
132 static struct CadetClient *sc_tail;
133
134 /**
135  * Number of active cadet clients in the 'sc_*'-DLL.
136  */
137 static unsigned int sc_count;
138
139 /**
140  * Maximum allowed number of cadet clients.
141  */
142 static unsigned long long sc_count_max;
143
144
145
146 /**
147  * Task run to asynchronously terminate the cadet due to timeout.
148  *
149  * @param cls the 'struct CadetClient'
150  */
151 static void
152 timeout_cadet_task (void *cls)
153 {
154   struct CadetClient *sc = cls;
155   struct GNUNET_CADET_Channel *tun;
156
157   sc->timeout_task = NULL;
158   tun = sc->channel;
159   sc->channel = NULL;
160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
161               "Timeout for inactive cadet client %p\n",
162               sc);
163   GNUNET_CADET_channel_destroy (tun);
164 }
165
166
167 /**
168  * Reset the timeout for the cadet client (due to activity).
169  *
170  * @param sc client handle to reset timeout for
171  */
172 static void
173 refresh_timeout_task (struct CadetClient *sc)
174 {
175   if (NULL != sc->timeout_task)
176     GNUNET_SCHEDULER_cancel (sc->timeout_task);
177   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
178                                                    &timeout_cadet_task,
179                                                    sc);
180 }
181
182
183 /**
184  * Check if we are done with the write queue, and if so tell CADET
185  * that we are ready to read more.
186  *
187  * @param cls where to process the write queue
188  */
189 static void
190 continue_writing (void *cls)
191 {
192   struct CadetClient *sc = cls;
193   struct GNUNET_MQ_Handle *mq;
194
195   mq = GNUNET_CADET_get_mq (sc->channel);
196   if (0 != GNUNET_MQ_get_length (mq))
197   {
198     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
199                 "Write pending, waiting for it to complete\n");
200     return;
201   }
202   refresh_timeout_task (sc);
203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
204               "Finished processing cadet request from client %p, ready to receive the next one\n",
205               sc);
206   GNUNET_CADET_receive_done (sc->channel);
207 }
208
209
210 /**
211  * Process a datum that was stored in the datastore.
212  *
213  * @param cls closure with the `struct CadetClient` which sent the query
214  * @param key key for the content
215  * @param size number of bytes in @a data
216  * @param data content stored
217  * @param type type of the content
218  * @param priority priority of the content
219  * @param anonymity anonymity-level for the content
220  * @param replication replication-level for the content
221  * @param expiration expiration time for the content
222  * @param uid unique identifier for the datum;
223  *        maybe 0 if no unique identifier is available
224  */
225 static void
226 handle_datastore_reply (void *cls,
227                         const struct GNUNET_HashCode *key,
228                         size_t size,
229                         const void *data,
230                         enum GNUNET_BLOCK_Type type,
231                         uint32_t priority,
232                         uint32_t anonymity,
233                         uint32_t replication,
234                         struct GNUNET_TIME_Absolute expiration,
235                         uint64_t uid)
236 {
237   struct CadetClient *sc = cls;
238   size_t msize = size + sizeof (struct CadetReplyMessage);
239   struct GNUNET_MQ_Envelope *env;
240   struct CadetReplyMessage *srm;
241
242   sc->qe = NULL;
243   if (NULL == data)
244   {
245     /* no result, this should not really happen, as for
246        non-anonymous routing only peers that HAVE the
247        answers should be queried; OTOH, this is not a
248        hard error as we might have had the answer in the
249        past and the user might have unindexed it. Hence
250        we log at level "INFO" for now. */
251     if (NULL == key)
252     {
253       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
254                   "Have no answer and the query was NULL\n");
255     }
256     else
257     {
258       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
259                   "Have no answer for query `%s'\n",
260                   GNUNET_h2s (key));
261     }
262     GNUNET_STATISTICS_update (GSF_stats,
263                               gettext_noop ("# queries received via CADET not answered"),
264                               1,
265                               GNUNET_NO);
266     continue_writing (sc);
267     return;
268   }
269   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
270   {
271     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
272                 "Performing on-demand encoding for query %s\n",
273                 GNUNET_h2s (key));
274     if (GNUNET_OK !=
275         GNUNET_FS_handle_on_demand_block (key,
276                                     size,
277                                     data,
278                                     type,
279                                     priority,
280                                     anonymity,
281                                     replication,
282                                     expiration,
283                                     uid,
284                                     &handle_datastore_reply,
285                                     sc))
286     {
287       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288                   "On-demand encoding request failed\n");
289       continue_writing (sc);
290     }
291     return;
292   }
293   if (msize > GNUNET_MAX_MESSAGE_SIZE)
294   {
295     GNUNET_break (0);
296     continue_writing (sc);
297     return;
298   }
299   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
300   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
301               "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
302               (unsigned int) size,
303               (unsigned int) type,
304               GNUNET_h2s (key),
305               sc);
306   env = GNUNET_MQ_msg_extra (srm,
307                              size,
308                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
309   srm->type = htonl (type);
310   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
311   GNUNET_memcpy (&srm[1],
312                  data,
313                  size);
314   GNUNET_MQ_notify_sent (env,
315                          &continue_writing,
316                          sc);
317   GNUNET_STATISTICS_update (GSF_stats,
318                             gettext_noop ("# Blocks transferred via cadet"),
319                             1,
320                             GNUNET_NO);
321   GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
322                   env);
323 }
324
325
326 /**
327  * Functions with this signature are called whenever a
328  * complete query message is received.
329  *
330  * @param cls closure with the `struct CadetClient`
331  * @param sqm the actual message
332  */
333 static void
334 handle_request (void *cls,
335                 const struct CadetQueryMessage *sqm)
336 {
337   struct CadetClient *sc = cls;
338
339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340               "Received query for `%s' via cadet from client %p\n",
341               GNUNET_h2s (&sqm->query),
342               sc);
343   GNUNET_STATISTICS_update (GSF_stats,
344                             gettext_noop ("# queries received via cadet"),
345                             1,
346                             GNUNET_NO);
347   refresh_timeout_task (sc);
348   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
349                                      0 /* next_uid */,
350                                      false /* random */,
351                                      &sqm->query,
352                                      ntohl (sqm->type),
353                                      0 /* priority */,
354                                      GSF_datastore_queue_size,
355                                      &handle_datastore_reply,
356                                      sc);
357   if (NULL == sc->qe)
358   {
359     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360                 "Queueing request with datastore failed (queue full?)\n");
361     continue_writing (sc);
362   }
363 }
364
365
366 /**
367  * Functions of this type are called upon new cadet connection from other peers.
368  *
369  * @param cls the closure from GNUNET_CADET_connect
370  * @param channel the channel representing the cadet
371  * @param initiator the identity of the peer who wants to establish a cadet
372  *            with us; NULL on binding error
373  * @return initial channel context (our `struct CadetClient`)
374  */
375 static void *
376 connect_cb (void *cls,
377             struct GNUNET_CADET_Channel *channel,
378             const struct GNUNET_PeerIdentity *initiator)
379 {
380   struct CadetClient *sc;
381
382   GNUNET_assert (NULL != channel);
383   if (sc_count >= sc_count_max)
384   {
385     GNUNET_STATISTICS_update (GSF_stats,
386                               gettext_noop ("# cadet client connections rejected"),
387                               1,
388                               GNUNET_NO);
389     GNUNET_CADET_channel_destroy (channel);
390     return NULL;
391   }
392   GNUNET_STATISTICS_update (GSF_stats,
393                             gettext_noop ("# cadet connections active"),
394                             1,
395                             GNUNET_NO);
396   sc = GNUNET_new (struct CadetClient);
397   sc->channel = channel;
398   GNUNET_CONTAINER_DLL_insert (sc_head,
399                                sc_tail,
400                                sc);
401   sc_count++;
402   refresh_timeout_task (sc);
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Accepting inbound cadet connection from `%s' as client %p\n",
405               GNUNET_i2s (initiator),
406               sc);
407   return sc;
408 }
409
410
411 /**
412  * Function called by cadet when a client disconnects.
413  * Cleans up our `struct CadetClient` of that channel.
414  *
415  * @param cls  our `struct CadetClient`
416  * @param channel channel of the disconnecting client
417  * @param channel_ctx
418  */
419 static void
420 disconnect_cb (void *cls,
421                const struct GNUNET_CADET_Channel *channel)
422 {
423   struct CadetClient *sc = cls;
424   struct WriteQueueItem *wqi;
425
426   if (NULL == sc)
427     return;
428   sc->channel = NULL;
429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
430               "Terminating cadet connection with client %p\n",
431               sc);
432   GNUNET_STATISTICS_update (GSF_stats,
433                             gettext_noop ("# cadet connections active"), -1,
434                             GNUNET_NO);
435   if (NULL != sc->terminate_task)
436     GNUNET_SCHEDULER_cancel (sc->terminate_task);
437   if (NULL != sc->timeout_task)
438     GNUNET_SCHEDULER_cancel (sc->timeout_task);
439   if (NULL != sc->qe)
440     GNUNET_DATASTORE_cancel (sc->qe);
441   while (NULL != (wqi = sc->wqi_head))
442   {
443     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
444                                  sc->wqi_tail,
445                                  wqi);
446     GNUNET_free (wqi);
447   }
448   GNUNET_CONTAINER_DLL_remove (sc_head,
449                                sc_tail,
450                                sc);
451   sc_count--;
452   GNUNET_free (sc);
453 }
454
455
456 /**
457  * Function called whenever an MQ-channel's transmission window size changes.
458  *
459  * The first callback in an outgoing channel will be with a non-zero value
460  * and will mean the channel is connected to the destination.
461  *
462  * For an incoming channel it will be called immediately after the
463  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
464  *
465  * @param cls Channel closure.
466  * @param channel Connection to the other end (henceforth invalid).
467  * @param window_size New window size. If the is more messages than buffer size
468  *                    this value will be negative..
469  */
470 static void
471 window_change_cb (void *cls,
472                   const struct GNUNET_CADET_Channel *channel,
473                   int window_size)
474 {
475   /* FIXME: could do flow control here... */
476 }
477
478
479 /**
480  * Initialize subsystem for non-anonymous file-sharing.
481  */
482 void
483 GSF_cadet_start_server ()
484 {
485   struct GNUNET_MQ_MessageHandler handlers[] = {
486     GNUNET_MQ_hd_fixed_size (request,
487                              GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
488                              struct CadetQueryMessage,
489                              NULL),
490     GNUNET_MQ_handler_end ()
491   };
492   struct GNUNET_HashCode port;
493
494   if (GNUNET_YES !=
495       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
496                                              "fs",
497                                              "MAX_CADET_CLIENTS",
498                                              &sc_count_max))
499     return;
500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501               "Initializing cadet FS server with a limit of %llu connections\n",
502               sc_count_max);
503   cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
504   cadet_handle = GNUNET_CADET_connect (GSF_cfg);
505   GNUNET_assert (NULL != cadet_handle);
506   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
507                       strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
508                       &port);
509   cadet_port = GNUNET_CADET_open_port (cadet_handle,
510                                        &port,
511                                        &connect_cb,
512                                        NULL,
513                                        &window_change_cb,
514                                        &disconnect_cb,
515                                        handlers);
516 }
517
518
519 /**
520  * Shutdown subsystem for non-anonymous file-sharing.
521  */
522 void
523 GSF_cadet_stop_server ()
524 {
525   GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
526                                          &GSF_cadet_release_clients,
527                                          NULL);
528   GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
529   cadet_map = NULL;
530   if (NULL != cadet_port)
531   {
532     GNUNET_CADET_close_port (cadet_port);
533     cadet_port = NULL;
534   }
535   if (NULL != cadet_handle)
536   {
537     GNUNET_CADET_disconnect (cadet_handle);
538     cadet_handle = NULL;
539   }
540   GNUNET_assert (NULL == sc_head);
541   GNUNET_assert (0 == sc_count);
542 }
543
544 /* end of gnunet-service-fs_cadet.c */