Move type check after initialization to make compiler happy
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_server.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cadet_server.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
39
40 /**
41  * After how long do we termiante idle connections?
42  */
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
44
45
46 /**
47  * A message in the queue to be written to the cadet.
48  */
49 struct WriteQueueItem
50 {
51   /**
52    * Kept in a DLL.
53    */
54   struct WriteQueueItem *next;
55
56   /**
57    * Kept in a DLL.
58    */
59   struct WriteQueueItem *prev;
60
61   /**
62    * Number of bytes of payload, allocated at the end of this struct.
63    */
64   size_t msize;
65 };
66
67
68 /**
69  * Information we keep around for each active cadeting client.
70  */
71 struct CadetClient
72 {
73   /**
74    * DLL
75    */
76   struct CadetClient *next;
77
78   /**
79    * DLL
80    */
81   struct CadetClient *prev;
82
83   /**
84    * Channel for communication.
85    */
86   struct GNUNET_CADET_Channel *channel;
87
88   /**
89    * Handle for active write operation, or NULL.
90    */
91   struct GNUNET_CADET_TransmitHandle *wh;
92
93   /**
94    * Head of write queue.
95    */
96   struct WriteQueueItem *wqi_head;
97
98   /**
99    * Tail of write queue.
100    */
101   struct WriteQueueItem *wqi_tail;
102
103   /**
104    * Current active request to the datastore, if we have one pending.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *qe;
107
108   /**
109    * Task that is scheduled to asynchronously terminate the connection.
110    */
111   struct GNUNET_SCHEDULER_Task * terminate_task;
112
113   /**
114    * Task that is scheduled to terminate idle connections.
115    */
116   struct GNUNET_SCHEDULER_Task * timeout_task;
117
118   /**
119    * Size of the last write that was initiated.
120    */
121   size_t reply_size;
122
123 };
124
125
126 /**
127  * Listen channel for incoming requests.
128  */
129 static struct GNUNET_CADET_Handle *listen_channel;
130
131 /**
132  * Head of DLL of cadet clients.
133  */
134 static struct CadetClient *sc_head;
135
136 /**
137  * Tail of DLL of cadet clients.
138  */
139 static struct CadetClient *sc_tail;
140
141 /**
142  * Number of active cadet clients in the 'sc_*'-DLL.
143  */
144 static unsigned int sc_count;
145
146 /**
147  * Maximum allowed number of cadet clients.
148  */
149 static unsigned long long sc_count_max;
150
151
152
153 /**
154  * Task run to asynchronously terminate the cadet due to timeout.
155  *
156  * @param cls the 'struct CadetClient'
157  */
158 static void
159 timeout_cadet_task (void *cls)
160 {
161   struct CadetClient *sc = cls;
162   struct GNUNET_CADET_Channel *tun;
163
164   sc->timeout_task = NULL;
165   tun = sc->channel;
166   sc->channel = NULL;
167   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
168               "Timeout for inactive cadet client %p\n",
169               sc);
170   GNUNET_CADET_channel_destroy (tun);
171 }
172
173
174 /**
175  * Reset the timeout for the cadet client (due to activity).
176  *
177  * @param sc client handle to reset timeout for
178  */
179 static void
180 refresh_timeout_task (struct CadetClient *sc)
181 {
182   if (NULL != sc->timeout_task)
183     GNUNET_SCHEDULER_cancel (sc->timeout_task);
184   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
185                                                    &timeout_cadet_task,
186                                                    sc);
187 }
188
189
190 /**
191  * We're done handling a request from a client, read the next one.
192  *
193  * @param sc client to continue reading requests from
194  */
195 static void
196 continue_reading (struct CadetClient *sc)
197 {
198   refresh_timeout_task (sc);
199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200               "Finished processing cadet request from client %p, ready to receive the next one\n",
201               sc);
202   GNUNET_CADET_receive_done (sc->channel);
203 }
204
205
206 /**
207  * Transmit the next entry from the write queue.
208  *
209  * @param sc where to process the write queue
210  */
211 static void
212 continue_writing (struct CadetClient *sc);
213
214
215 /**
216  * Send a reply now, cadet is ready.
217  *
218  * @param cls closure with the `struct CadetClient` which sent the query
219  * @param size number of bytes available in @a buf
220  * @param buf where to write the message
221  * @return number of bytes written to @a buf
222  */
223 static size_t
224 write_continuation (void *cls,
225                     size_t size,
226                     void *buf)
227 {
228   struct CadetClient *sc = cls;
229   struct GNUNET_CADET_Channel *tun;
230   struct WriteQueueItem *wqi;
231   size_t ret;
232
233   sc->wh = NULL;
234   if (NULL == (wqi = sc->wqi_head))
235   {
236     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
237                 "Write queue empty, reading more requests\n");
238     return 0;
239   }
240   if ( (0 == size) ||
241        (size < wqi->msize) )
242   {
243     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244                 "Transmission of reply failed, terminating cadet\n");
245     tun = sc->channel;
246     sc->channel = NULL;
247     GNUNET_CADET_channel_destroy (tun);
248     return 0;
249   }
250   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
251                                sc->wqi_tail,
252                                wqi);
253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254               "Transmitted %u byte reply via cadet to %p\n",
255               (unsigned int) size,
256               sc);
257   GNUNET_STATISTICS_update (GSF_stats,
258                             gettext_noop ("# Blocks transferred via cadet"), 1,
259                             GNUNET_NO);
260   ret = wqi->msize;
261   GNUNET_memcpy (buf, &wqi[1], ret);
262   GNUNET_free (wqi);
263   continue_writing (sc);
264   return ret;
265 }
266
267
268 /**
269  * Transmit the next entry from the write queue.
270  *
271  * @param sc where to process the write queue
272  */
273 static void
274 continue_writing (struct CadetClient *sc)
275 {
276   struct WriteQueueItem *wqi;
277   struct GNUNET_CADET_Channel *tun;
278
279   if (NULL != sc->wh)
280   {
281     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282                 "Write pending, waiting for it to complete\n");
283     return; /* write already pending */
284   }
285   if (NULL == (wqi = sc->wqi_head))
286   {
287     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288                 "Write queue empty, reading more requests\n");
289     continue_reading (sc);
290     return;
291   }
292   sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
293                                               GNUNET_TIME_UNIT_FOREVER_REL,
294                                               wqi->msize,
295                                               &write_continuation,
296                                               sc);
297   if (NULL == sc->wh)
298   {
299     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300                 "Write failed; terminating cadet\n");
301     tun = sc->channel;
302     sc->channel = NULL;
303     GNUNET_CADET_channel_destroy (tun);
304     return;
305   }
306 }
307
308
309 /**
310  * Process a datum that was stored in the datastore.
311  *
312  * @param cls closure with the `struct CadetClient` which sent the query
313  * @param key key for the content
314  * @param size number of bytes in @a data
315  * @param data content stored
316  * @param type type of the content
317  * @param priority priority of the content
318  * @param anonymity anonymity-level for the content
319  * @param expiration expiration time for the content
320  * @param uid unique identifier for the datum;
321  *        maybe 0 if no unique identifier is available
322  */
323 static void
324 handle_datastore_reply (void *cls,
325                         const struct GNUNET_HashCode *key,
326                         size_t size,
327                         const void *data,
328                         enum GNUNET_BLOCK_Type type,
329                         uint32_t priority,
330                         uint32_t anonymity,
331                         struct GNUNET_TIME_Absolute expiration,
332                         uint64_t uid)
333 {
334   struct CadetClient *sc = cls;
335   size_t msize = size + sizeof (struct CadetReplyMessage);
336   struct WriteQueueItem *wqi;
337   struct CadetReplyMessage *srm;
338
339   sc->qe = NULL;
340   if (NULL == data)
341   {
342     /* no result, this should not really happen, as for
343        non-anonymous routing only peers that HAVE the
344        answers should be queried; OTOH, this is not a
345        hard error as we might have had the answer in the
346        past and the user might have unindexed it. Hence
347        we log at level "INFO" for now. */
348     if (NULL == key)
349     {
350       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
351                   "Have no answer and the query was NULL\n");
352     }
353     else
354     {
355       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
356                   "Have no answer for query `%s'\n",
357                   GNUNET_h2s (key));
358     }
359     GNUNET_STATISTICS_update (GSF_stats,
360                               gettext_noop ("# queries received via CADET not answered"), 1,
361                               GNUNET_NO);
362     continue_writing (sc);
363     return;
364   }
365   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
366   {
367     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368                 "Performing on-demand encoding for query %s\n",
369                 GNUNET_h2s (key));
370     if (GNUNET_OK !=
371         GNUNET_FS_handle_on_demand_block (key,
372                                           size, data, type,
373                                           priority, anonymity,
374                                           expiration, uid,
375                                           &handle_datastore_reply,
376                                           sc))
377     {
378       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379                   "On-demand encoding request failed\n");
380       continue_writing (sc);
381     }
382     return;
383   }
384   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
385   {
386     GNUNET_break (0);
387     continue_writing (sc);
388     return;
389   }
390   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392               "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
393               (unsigned int) size,
394               (unsigned int) type,
395               GNUNET_h2s (key),
396               sc);
397   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
398   wqi->msize = msize;
399   srm = (struct CadetReplyMessage *) &wqi[1];
400   srm->header.size = htons ((uint16_t) msize);
401   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
402   srm->type = htonl (type);
403   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
404   GNUNET_memcpy (&srm[1], data, size);
405   sc->reply_size = msize;
406   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
407                                sc->wqi_tail,
408                                wqi);
409   continue_writing (sc);
410 }
411
412
413 /**
414  * Functions with this signature are called whenever a
415  * complete query message is received.
416  *
417  * Do not call #GNUNET_SERVER_mst_destroy() in callback
418  *
419  * @param cls closure with the `struct CadetClient`
420  * @param channel channel handle
421  * @param channel_ctx channel context
422  * @param message the actual message
423  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
424  */
425 static int
426 request_cb (void *cls,
427             struct GNUNET_CADET_Channel *channel,
428             void **channel_ctx,
429             const struct GNUNET_MessageHeader *message)
430 {
431   struct CadetClient *sc = *channel_ctx;
432   const struct CadetQueryMessage *sqm;
433
434   sqm = (const struct CadetQueryMessage *) message;
435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436               "Received query for `%s' via cadet from client %p\n",
437               GNUNET_h2s (&sqm->query),
438               sc);
439   GNUNET_STATISTICS_update (GSF_stats,
440                             gettext_noop ("# queries received via cadet"), 1,
441                             GNUNET_NO);
442   refresh_timeout_task (sc);
443   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
444                                      0,
445                                      &sqm->query,
446                                      ntohl (sqm->type),
447                                      0 /* priority */,
448                                      GSF_datastore_queue_size,
449                                      &handle_datastore_reply, sc);
450   if (NULL == sc->qe)
451   {
452     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453                 "Queueing request with datastore failed (queue full?)\n");
454     continue_writing (sc);
455   }
456   return GNUNET_OK;
457 }
458
459
460 /**
461  * Functions of this type are called upon new cadet connection from other peers.
462  *
463  * @param cls the closure from GNUNET_CADET_connect
464  * @param channel the channel representing the cadet
465  * @param initiator the identity of the peer who wants to establish a cadet
466  *            with us; NULL on binding error
467  * @param port cadet port used for the incoming connection
468  * @param options channel option flags
469  * @return initial channel context (our 'struct CadetClient')
470  */
471 static void *
472 accept_cb (void *cls,
473            struct GNUNET_CADET_Channel *channel,
474            const struct GNUNET_PeerIdentity *initiator,
475            const struct GNUNET_HashCode *port,
476            enum GNUNET_CADET_ChannelOption options)
477 {
478   struct CadetClient *sc;
479
480   GNUNET_assert (NULL != channel);
481   if (sc_count >= sc_count_max)
482   {
483     GNUNET_STATISTICS_update (GSF_stats,
484                               gettext_noop ("# cadet client connections rejected"), 1,
485                               GNUNET_NO);
486     GNUNET_CADET_channel_destroy (channel);
487     return NULL;
488   }
489   GNUNET_STATISTICS_update (GSF_stats,
490                             gettext_noop ("# cadet connections active"), 1,
491                             GNUNET_NO);
492   sc = GNUNET_new (struct CadetClient);
493   sc->channel = channel;
494   GNUNET_CONTAINER_DLL_insert (sc_head,
495                                sc_tail,
496                                sc);
497   sc_count++;
498   refresh_timeout_task (sc);
499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500               "Accepting inbound cadet connection from `%s' as client %p\n",
501               GNUNET_i2s (initiator),
502               sc);
503   return sc;
504 }
505
506
507 /**
508  * Function called by cadet when a client disconnects.
509  * Cleans up our 'struct CadetClient' of that channel.
510  *
511  * @param cls NULL
512  * @param channel channel of the disconnecting client
513  * @param channel_ctx our 'struct CadetClient'
514  */
515 static void
516 cleaner_cb (void *cls,
517             const struct GNUNET_CADET_Channel *channel,
518             void *channel_ctx)
519 {
520   struct CadetClient *sc = channel_ctx;
521   struct WriteQueueItem *wqi;
522
523   if (NULL == sc)
524     return;
525   sc->channel = NULL;
526   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
527               "Terminating cadet connection with client %p\n",
528               sc);
529   GNUNET_STATISTICS_update (GSF_stats,
530                             gettext_noop ("# cadet connections active"), -1,
531                             GNUNET_NO);
532   if (NULL != sc->terminate_task)
533     GNUNET_SCHEDULER_cancel (sc->terminate_task);
534   if (NULL != sc->timeout_task)
535     GNUNET_SCHEDULER_cancel (sc->timeout_task);
536   if (NULL != sc->wh)
537     GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
538   if (NULL != sc->qe)
539     GNUNET_DATASTORE_cancel (sc->qe);
540   while (NULL != (wqi = sc->wqi_head))
541   {
542     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
543                                  sc->wqi_tail,
544                                  wqi);
545     GNUNET_free (wqi);
546   }
547   GNUNET_CONTAINER_DLL_remove (sc_head,
548                                sc_tail,
549                                sc);
550   sc_count--;
551   GNUNET_free (sc);
552 }
553
554
555 /**
556  * Initialize subsystem for non-anonymous file-sharing.
557  */
558 void
559 GSF_cadet_start_server ()
560 {
561   static const struct GNUNET_CADET_MessageHandler handlers[] = {
562     { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)},
563     { NULL, 0, 0 }
564   };
565   struct GNUNET_HashCode port;
566
567   if (GNUNET_YES !=
568       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
569                                              "fs",
570                                              "MAX_CADET_CLIENTS",
571                                              &sc_count_max))
572     return;
573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574               "Initializing cadet FS server with a limit of %llu connections\n",
575               sc_count_max);
576   listen_channel = GNUNET_CADET_connect (GSF_cfg,
577                                          NULL,
578                                          &cleaner_cb,
579                                          handlers);
580   GNUNET_assert (NULL != listen_channel);
581   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
582                       strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
583                       &port);
584   GNUNET_CADET_open_port (listen_channel,
585                           &port,
586                           &accept_cb,
587                           NULL);
588 }
589
590
591 /**
592  * Shutdown subsystem for non-anonymous file-sharing.
593  */
594 void
595 GSF_cadet_stop_server ()
596 {
597   if (NULL != listen_channel)
598   {
599     GNUNET_CADET_disconnect (listen_channel);
600     listen_channel = NULL;
601   }
602   GNUNET_assert (NULL == sc_head);
603   GNUNET_assert (0 == sc_count);
604 }
605
606 /* end of gnunet-service-fs_cadet.c */