convert to new CADET API, not working due to CADET-API internal bugs
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_server.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cadet_server.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
39
40 /**
41  * After how long do we termiante idle connections?
42  */
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
44
45
46 /**
47  * A message in the queue to be written to the cadet.
48  */
49 struct WriteQueueItem
50 {
51   /**
52    * Kept in a DLL.
53    */
54   struct WriteQueueItem *next;
55
56   /**
57    * Kept in a DLL.
58    */
59   struct WriteQueueItem *prev;
60
61   /**
62    * Number of bytes of payload, allocated at the end of this struct.
63    */
64   size_t msize;
65 };
66
67
68 /**
69  * Information we keep around for each active cadeting client.
70  */
71 struct CadetClient
72 {
73   /**
74    * DLL
75    */
76   struct CadetClient *next;
77
78   /**
79    * DLL
80    */
81   struct CadetClient *prev;
82
83   /**
84    * Channel for communication.
85    */
86   struct GNUNET_CADET_Channel *channel;
87
88   /**
89    * Handle for active write operation, or NULL.
90    */
91   struct GNUNET_CADET_TransmitHandle *wh;
92
93   /**
94    * Head of write queue.
95    */
96   struct WriteQueueItem *wqi_head;
97
98   /**
99    * Tail of write queue.
100    */
101   struct WriteQueueItem *wqi_tail;
102
103   /**
104    * Current active request to the datastore, if we have one pending.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *qe;
107
108   /**
109    * Task that is scheduled to asynchronously terminate the connection.
110    */
111   struct GNUNET_SCHEDULER_Task * terminate_task;
112
113   /**
114    * Task that is scheduled to terminate idle connections.
115    */
116   struct GNUNET_SCHEDULER_Task * timeout_task;
117
118   /**
119    * Size of the last write that was initiated.
120    */
121   size_t reply_size;
122
123 };
124
125
126 /**
127  * Listen port for incoming requests.
128  */
129 static struct GNUNET_CADET_Port *cadet_port;
130
131 /**
132  * Head of DLL of cadet clients.
133  */
134 static struct CadetClient *sc_head;
135
136 /**
137  * Tail of DLL of cadet clients.
138  */
139 static struct CadetClient *sc_tail;
140
141 /**
142  * Number of active cadet clients in the 'sc_*'-DLL.
143  */
144 static unsigned int sc_count;
145
146 /**
147  * Maximum allowed number of cadet clients.
148  */
149 static unsigned long long sc_count_max;
150
151
152
153 /**
154  * Task run to asynchronously terminate the cadet due to timeout.
155  *
156  * @param cls the 'struct CadetClient'
157  */
158 static void
159 timeout_cadet_task (void *cls)
160 {
161   struct CadetClient *sc = cls;
162   struct GNUNET_CADET_Channel *tun;
163
164   sc->timeout_task = NULL;
165   tun = sc->channel;
166   sc->channel = NULL;
167   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
168               "Timeout for inactive cadet client %p\n",
169               sc);
170   GNUNET_CADET_channel_destroy (tun);
171 }
172
173
174 /**
175  * Reset the timeout for the cadet client (due to activity).
176  *
177  * @param sc client handle to reset timeout for
178  */
179 static void
180 refresh_timeout_task (struct CadetClient *sc)
181 {
182   if (NULL != sc->timeout_task)
183     GNUNET_SCHEDULER_cancel (sc->timeout_task);
184   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
185                                                    &timeout_cadet_task,
186                                                    sc);
187 }
188
189
190 /**
191  * Check if we are done with the write queue, and if so tell CADET
192  * that we are ready to read more.
193  *
194  * @param cls where to process the write queue
195  */
196 static void
197 continue_writing (void *cls)
198 {
199   struct CadetClient *sc = cls;
200   struct GNUNET_MQ_Handle *mq;
201
202   mq = GNUNET_CADET_get_mq (sc->channel);
203   if (0 != GNUNET_MQ_get_length (mq))
204   {
205     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
206                 "Write pending, waiting for it to complete\n");
207     return;
208   }
209   refresh_timeout_task (sc);
210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
211               "Finished processing cadet request from client %p, ready to receive the next one\n",
212               sc);
213   GNUNET_CADET_receive_done (sc->channel);
214 }
215
216
217 /**
218  * Process a datum that was stored in the datastore.
219  *
220  * @param cls closure with the `struct CadetClient` which sent the query
221  * @param key key for the content
222  * @param size number of bytes in @a data
223  * @param data content stored
224  * @param type type of the content
225  * @param priority priority of the content
226  * @param anonymity anonymity-level for the content
227  * @param expiration expiration time for the content
228  * @param uid unique identifier for the datum;
229  *        maybe 0 if no unique identifier is available
230  */
231 static void
232 handle_datastore_reply (void *cls,
233                         const struct GNUNET_HashCode *key,
234                         size_t size,
235                         const void *data,
236                         enum GNUNET_BLOCK_Type type,
237                         uint32_t priority,
238                         uint32_t anonymity,
239                         struct GNUNET_TIME_Absolute expiration,
240                         uint64_t uid)
241 {
242   struct CadetClient *sc = cls;
243   size_t msize = size + sizeof (struct CadetReplyMessage);
244   struct GNUNET_MQ_Envelope *env;
245   struct CadetReplyMessage *srm;
246
247   sc->qe = NULL;
248   if (NULL == data)
249   {
250     /* no result, this should not really happen, as for
251        non-anonymous routing only peers that HAVE the
252        answers should be queried; OTOH, this is not a
253        hard error as we might have had the answer in the
254        past and the user might have unindexed it. Hence
255        we log at level "INFO" for now. */
256     if (NULL == key)
257     {
258       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
259                   "Have no answer and the query was NULL\n");
260     }
261     else
262     {
263       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
264                   "Have no answer for query `%s'\n",
265                   GNUNET_h2s (key));
266     }
267     GNUNET_STATISTICS_update (GSF_stats,
268                               gettext_noop ("# queries received via CADET not answered"),
269                               1,
270                               GNUNET_NO);
271     continue_writing (sc);
272     return;
273   }
274   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
275   {
276     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
277                 "Performing on-demand encoding for query %s\n",
278                 GNUNET_h2s (key));
279     if (GNUNET_OK !=
280         GNUNET_FS_handle_on_demand_block (key,
281                                           size,
282                                           data,
283                                           type,
284                                           priority,
285                                           anonymity,
286                                           expiration,
287                                           uid,
288                                           &handle_datastore_reply,
289                                           sc))
290     {
291       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
292                   "On-demand encoding request failed\n");
293       continue_writing (sc);
294     }
295     return;
296   }
297   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
298   {
299     GNUNET_break (0);
300     continue_writing (sc);
301     return;
302   }
303   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
304   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
305               "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
306               (unsigned int) size,
307               (unsigned int) type,
308               GNUNET_h2s (key),
309               sc);
310   env = GNUNET_MQ_msg_extra (srm,
311                              size,
312                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
313   srm->type = htonl (type);
314   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
315   GNUNET_memcpy (&srm[1],
316                  data,
317                  size);
318   GNUNET_MQ_notify_sent (env,
319                          &continue_writing,
320                          sc);
321   GNUNET_STATISTICS_update (GSF_stats,
322                             gettext_noop ("# Blocks transferred via cadet"),
323                             1,
324                             GNUNET_NO);
325   GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
326                   env);
327 }
328
329
330 /**
331  * Functions with this signature are called whenever a
332  * complete query message is received.
333  *
334  * @param cls closure with the `struct CadetClient`
335  * @param sqm the actual message
336  */
337 static void
338 handle_request (void *cls,
339                 const struct CadetQueryMessage *sqm)
340 {
341   struct CadetClient *sc = cls;
342
343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
344               "Received query for `%s' via cadet from client %p\n",
345               GNUNET_h2s (&sqm->query),
346               sc);
347   GNUNET_STATISTICS_update (GSF_stats,
348                             gettext_noop ("# queries received via cadet"),
349                             1,
350                             GNUNET_NO);
351   refresh_timeout_task (sc);
352   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
353                                      0,
354                                      &sqm->query,
355                                      ntohl (sqm->type),
356                                      0 /* priority */,
357                                      GSF_datastore_queue_size,
358                                      &handle_datastore_reply,
359                                      sc);
360   if (NULL == sc->qe)
361   {
362     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
363                 "Queueing request with datastore failed (queue full?)\n");
364     continue_writing (sc);
365   }
366 }
367
368
369 /**
370  * Functions of this type are called upon new cadet connection from other peers.
371  *
372  * @param cls the closure from GNUNET_CADET_connect
373  * @param channel the channel representing the cadet
374  * @param initiator the identity of the peer who wants to establish a cadet
375  *            with us; NULL on binding error
376  * @return initial channel context (our `struct CadetClient`)
377  */
378 static void *
379 connect_cb (void *cls,
380             struct GNUNET_CADET_Channel *channel,
381             const struct GNUNET_PeerIdentity *initiator)
382 {
383   struct CadetClient *sc;
384
385   GNUNET_assert (NULL != channel);
386   if (sc_count >= sc_count_max)
387   {
388     GNUNET_STATISTICS_update (GSF_stats,
389                               gettext_noop ("# cadet client connections rejected"),
390                               1,
391                               GNUNET_NO);
392     GNUNET_CADET_channel_destroy (channel);
393     return NULL;
394   }
395   GNUNET_STATISTICS_update (GSF_stats,
396                             gettext_noop ("# cadet connections active"),
397                             1,
398                             GNUNET_NO);
399   sc = GNUNET_new (struct CadetClient);
400   sc->channel = channel;
401   GNUNET_CONTAINER_DLL_insert (sc_head,
402                                sc_tail,
403                                sc);
404   sc_count++;
405   refresh_timeout_task (sc);
406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407               "Accepting inbound cadet connection from `%s' as client %p\n",
408               GNUNET_i2s (initiator),
409               sc);
410   return sc;
411 }
412
413
414 /**
415  * Function called by cadet when a client disconnects.
416  * Cleans up our `struct CadetClient` of that channel.
417  *
418  * @param cls  our `struct CadetClient`
419  * @param channel channel of the disconnecting client
420  * @param channel_ctx
421  */
422 static void
423 disconnect_cb (void *cls,
424                const struct GNUNET_CADET_Channel *channel)
425 {
426   struct CadetClient *sc = cls;
427   struct WriteQueueItem *wqi;
428
429   if (NULL == sc)
430     return;
431   sc->channel = NULL;
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "Terminating cadet connection with client %p\n",
434               sc);
435   GNUNET_STATISTICS_update (GSF_stats,
436                             gettext_noop ("# cadet connections active"), -1,
437                             GNUNET_NO);
438   if (NULL != sc->terminate_task)
439     GNUNET_SCHEDULER_cancel (sc->terminate_task);
440   if (NULL != sc->timeout_task)
441     GNUNET_SCHEDULER_cancel (sc->timeout_task);
442   if (NULL != sc->wh)
443     GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
444   if (NULL != sc->qe)
445     GNUNET_DATASTORE_cancel (sc->qe);
446   while (NULL != (wqi = sc->wqi_head))
447   {
448     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
449                                  sc->wqi_tail,
450                                  wqi);
451     GNUNET_free (wqi);
452   }
453   GNUNET_CONTAINER_DLL_remove (sc_head,
454                                sc_tail,
455                                sc);
456   sc_count--;
457   GNUNET_free (sc);
458 }
459
460
461
462 /**
463  * Function called whenever an MQ-channel's transmission window size changes.
464  *
465  * The first callback in an outgoing channel will be with a non-zero value
466  * and will mean the channel is connected to the destination.
467  *
468  * For an incoming channel it will be called immediately after the
469  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
470  *
471  * @param cls Channel closure.
472  * @param channel Connection to the other end (henceforth invalid).
473  * @param window_size New window size. If the is more messages than buffer size
474  *                    this value will be negative..
475  */
476 static void
477 window_change_cb (void *cls,
478                   const struct GNUNET_CADET_Channel *channel,
479                   int window_size)
480 {
481   /* FIXME: could do flow control here... */
482 }
483
484
485 /**
486  * Initialize subsystem for non-anonymous file-sharing.
487  */
488 void
489 GSF_cadet_start_server ()
490 {
491   struct GNUNET_MQ_MessageHandler handlers[] = {
492     GNUNET_MQ_hd_fixed_size (request,
493                              GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
494                              struct CadetQueryMessage,
495                              NULL),
496     GNUNET_MQ_handler_end ()
497   };
498   struct GNUNET_HashCode port;
499
500   if (GNUNET_YES !=
501       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
502                                              "fs",
503                                              "MAX_CADET_CLIENTS",
504                                              &sc_count_max))
505     return;
506   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507               "Initializing cadet FS server with a limit of %llu connections\n",
508               sc_count_max);
509   cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
510   cadet_handle = GNUNET_CADET_connecT (GSF_cfg);
511   GNUNET_assert (NULL != cadet_handle);
512   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
513                       strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
514                       &port);
515   cadet_port = GNUNET_CADET_open_porT (cadet_handle,
516                                        &port,
517                                        &connect_cb,
518                                        NULL,
519                                        &window_change_cb,
520                                        &disconnect_cb,
521                                        handlers);
522 }
523
524
525 /**
526  * Shutdown subsystem for non-anonymous file-sharing.
527  */
528 void
529 GSF_cadet_stop_server ()
530 {
531   GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
532                                          &GSF_cadet_release_clients,
533                                          NULL);
534   GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
535   cadet_map = NULL;
536   if (NULL != cadet_port)
537   {
538     GNUNET_CADET_close_port (cadet_port);
539     cadet_port = NULL;
540   }
541   if (NULL != cadet_handle)
542   {
543     GNUNET_CADET_disconnect (cadet_handle);
544     cadet_handle = NULL;
545   }
546   GNUNET_assert (NULL == sc_head);
547   GNUNET_assert (0 == sc_count);
548 }
549
550 /* end of gnunet-service-fs_cadet.c */