flush peer respect value on disconnect
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_server.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cadet_server.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
39
40 /**
41  * After how long do we termiante idle connections?
42  */
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
44
45
46 /**
47  * A message in the queue to be written to the cadet.
48  */
49 struct WriteQueueItem
50 {
51   /**
52    * Kept in a DLL.
53    */
54   struct WriteQueueItem *next;
55
56   /**
57    * Kept in a DLL.
58    */
59   struct WriteQueueItem *prev;
60
61   /**
62    * Number of bytes of payload, allocated at the end of this struct.
63    */
64   size_t msize;
65 };
66
67
68 /**
69  * Information we keep around for each active cadeting client.
70  */
71 struct CadetClient
72 {
73   /**
74    * DLL
75    */
76   struct CadetClient *next;
77
78   /**
79    * DLL
80    */
81   struct CadetClient *prev;
82
83   /**
84    * Channel for communication.
85    */
86   struct GNUNET_CADET_Channel *channel;
87
88   /**
89    * Handle for active write operation, or NULL.
90    */
91   struct GNUNET_CADET_TransmitHandle *wh;
92
93   /**
94    * Head of write queue.
95    */
96   struct WriteQueueItem *wqi_head;
97
98   /**
99    * Tail of write queue.
100    */
101   struct WriteQueueItem *wqi_tail;
102
103   /**
104    * Current active request to the datastore, if we have one pending.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *qe;
107
108   /**
109    * Task that is scheduled to asynchronously terminate the connection.
110    */
111   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
112
113   /**
114    * Task that is scheduled to terminate idle connections.
115    */
116   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
117
118   /**
119    * Size of the last write that was initiated.
120    */
121   size_t reply_size;
122
123 };
124
125
126 /**
127  * Listen channel for incoming requests.
128  */
129 static struct GNUNET_CADET_Handle *listen_channel;
130
131 /**
132  * Head of DLL of cadet clients.
133  */
134 static struct CadetClient *sc_head;
135
136 /**
137  * Tail of DLL of cadet clients.
138  */
139 static struct CadetClient *sc_tail;
140
141 /**
142  * Number of active cadet clients in the 'sc_*'-DLL.
143  */
144 static unsigned int sc_count;
145
146 /**
147  * Maximum allowed number of cadet clients.
148  */
149 static unsigned long long sc_count_max;
150
151
152
153 /**
154  * Task run to asynchronously terminate the cadet due to timeout.
155  *
156  * @param cls the 'struct CadetClient'
157  * @param tc scheduler context
158  */
159 static void
160 timeout_cadet_task (void *cls,
161                      const struct GNUNET_SCHEDULER_TaskContext *tc)
162 {
163   struct CadetClient *sc = cls;
164   struct GNUNET_CADET_Channel *tun;
165
166   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
167   tun = sc->channel;
168   sc->channel = NULL;
169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
170               "Timeout for inactive cadet client %p\n",
171               sc);
172   GNUNET_CADET_channel_destroy (tun);
173 }
174
175
176 /**
177  * Reset the timeout for the cadet client (due to activity).
178  *
179  * @param sc client handle to reset timeout for
180  */
181 static void
182 refresh_timeout_task (struct CadetClient *sc)
183 {
184   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
185     GNUNET_SCHEDULER_cancel (sc->timeout_task);
186   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
187                                                    &timeout_cadet_task,
188                                                    sc);
189 }
190
191
192 /**
193  * We're done handling a request from a client, read the next one.
194  *
195  * @param sc client to continue reading requests from
196  */
197 static void
198 continue_reading (struct CadetClient *sc)
199 {
200   refresh_timeout_task (sc);
201   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
202               "Finished processing cadet request from client %p, ready to receive the next one\n",
203               sc);
204   GNUNET_CADET_receive_done (sc->channel);
205 }
206
207
208 /**
209  * Transmit the next entry from the write queue.
210  *
211  * @param sc where to process the write queue
212  */
213 static void
214 continue_writing (struct CadetClient *sc);
215
216
217 /**
218  * Send a reply now, cadet is ready.
219  *
220  * @param cls closure with the `struct CadetClient` which sent the query
221  * @param size number of bytes available in @a buf
222  * @param buf where to write the message
223  * @return number of bytes written to @a buf
224  */
225 static size_t
226 write_continuation (void *cls,
227                     size_t size,
228                     void *buf)
229 {
230   struct CadetClient *sc = cls;
231   struct GNUNET_CADET_Channel *tun;
232   struct WriteQueueItem *wqi;
233   size_t ret;
234
235   sc->wh = NULL;
236   if (NULL == (wqi = sc->wqi_head))
237   {
238     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
239                 "Write queue empty, reading more requests\n");
240     return 0;
241   }
242   if ( (0 == size) ||
243        (size < wqi->msize) )
244   {
245     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
246                 "Transmission of reply failed, terminating cadet\n");
247     tun = sc->channel;
248     sc->channel = NULL;
249     GNUNET_CADET_channel_destroy (tun);
250     return 0;
251   }
252   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
253                                sc->wqi_tail,
254                                wqi);
255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
256               "Transmitted %u byte reply via cadet to %p\n",
257               (unsigned int) size,
258               sc);
259   GNUNET_STATISTICS_update (GSF_stats,
260                             gettext_noop ("# Blocks transferred via cadet"), 1,
261                             GNUNET_NO);
262   memcpy (buf, &wqi[1], ret = wqi->msize);
263   GNUNET_free (wqi);
264   continue_writing (sc);
265   return ret;
266 }
267
268
269 /**
270  * Transmit the next entry from the write queue.
271  *
272  * @param sc where to process the write queue
273  */
274 static void
275 continue_writing (struct CadetClient *sc)
276 {
277   struct WriteQueueItem *wqi;
278   struct GNUNET_CADET_Channel *tun;
279
280   if (NULL != sc->wh)
281   {
282     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
283                 "Write pending, waiting for it to complete\n");
284     return; /* write already pending */
285   }
286   if (NULL == (wqi = sc->wqi_head))
287   {
288     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289                 "Write queue empty, reading more requests\n");
290     continue_reading (sc);
291     return;
292   }
293   sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
294                                               GNUNET_TIME_UNIT_FOREVER_REL,
295                                               wqi->msize,
296                                               &write_continuation,
297                                               sc);
298   if (NULL == sc->wh)
299   {
300     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
301                 "Write failed; terminating cadet\n");
302     tun = sc->channel;
303     sc->channel = NULL;
304     GNUNET_CADET_channel_destroy (tun);
305     return;
306   }
307 }
308
309
310 /**
311  * Process a datum that was stored in the datastore.
312  *
313  * @param cls closure with the `struct CadetClient` which sent the query
314  * @param key key for the content
315  * @param size number of bytes in @a data
316  * @param data content stored
317  * @param type type of the content
318  * @param priority priority of the content
319  * @param anonymity anonymity-level for the content
320  * @param expiration expiration time for the content
321  * @param uid unique identifier for the datum;
322  *        maybe 0 if no unique identifier is available
323  */
324 static void
325 handle_datastore_reply (void *cls,
326                         const struct GNUNET_HashCode *key,
327                         size_t size, 
328                         const void *data,
329                         enum GNUNET_BLOCK_Type type,
330                         uint32_t priority,
331                         uint32_t anonymity,
332                         struct GNUNET_TIME_Absolute expiration,
333                         uint64_t uid)
334 {
335   struct CadetClient *sc = cls;
336   size_t msize = size + sizeof (struct CadetReplyMessage);
337   struct WriteQueueItem *wqi;
338   struct CadetReplyMessage *srm;
339
340   sc->qe = NULL;
341   if (NULL == data)
342   {
343     /* no result, this should not really happen, as for
344        non-anonymous routing only peers that HAVE the
345        answers should be queried; OTOH, this is not a
346        hard error as we might have had the answer in the
347        past and the user might have unindexed it. Hence
348        we log at level "INFO" for now. */
349     if (NULL == key)
350     {
351       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
352                   "Have no answer and the query was NULL\n");
353     }
354     else
355     {
356       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
357                   "Have no answer for query `%s'\n",
358                   GNUNET_h2s (key));
359     }
360     GNUNET_STATISTICS_update (GSF_stats,
361                               gettext_noop ("# queries received via CADET not answered"), 1,
362                               GNUNET_NO);
363     continue_writing (sc);
364     return;
365   }
366   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
367   {
368     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
369                 "Performing on-demand encoding for query %s\n",
370                 GNUNET_h2s (key));
371     if (GNUNET_OK !=
372         GNUNET_FS_handle_on_demand_block (key,
373                                           size, data, type,
374                                           priority, anonymity,
375                                           expiration, uid,
376                                           &handle_datastore_reply,
377                                           sc))
378     {
379       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
380                   "On-demand encoding request failed\n");
381       continue_writing (sc);
382     }
383     return;
384   }
385   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
386   {
387     GNUNET_break (0);
388     continue_writing (sc);
389     return;
390   }
391   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393               "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
394               (unsigned int) size,
395               (unsigned int) type,
396               GNUNET_h2s (key),
397               sc);
398   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
399   wqi->msize = msize;
400   srm = (struct CadetReplyMessage *) &wqi[1];
401   srm->header.size = htons ((uint16_t) msize);
402   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
403   srm->type = htonl (type);
404   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
405   memcpy (&srm[1], data, size);
406   sc->reply_size = msize;
407   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
408                                sc->wqi_tail,
409                                wqi);
410   continue_writing (sc);
411 }
412
413
414 /**
415  * Functions with this signature are called whenever a
416  * complete query message is received.
417  *
418  * Do not call #GNUNET_SERVER_mst_destroy() in callback
419  *
420  * @param cls closure with the `struct CadetClient`
421  * @param channel channel handle
422  * @param channel_ctx channel context
423  * @param message the actual message
424  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
425  */
426 static int
427 request_cb (void *cls,
428             struct GNUNET_CADET_Channel *channel,
429             void **channel_ctx,
430             const struct GNUNET_MessageHeader *message)
431 {
432   struct CadetClient *sc = *channel_ctx;
433   const struct CadetQueryMessage *sqm;
434
435   sqm = (const struct CadetQueryMessage *) message;
436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437               "Received query for `%s' via cadet from client %p\n",
438               GNUNET_h2s (&sqm->query),
439               sc);
440   GNUNET_STATISTICS_update (GSF_stats,
441                             gettext_noop ("# queries received via cadet"), 1,
442                             GNUNET_NO);
443   refresh_timeout_task (sc);
444   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
445                                      0,
446                                      &sqm->query,
447                                      ntohl (sqm->type),
448                                      0 /* priority */,
449                                      GSF_datastore_queue_size,
450                                      GNUNET_TIME_UNIT_FOREVER_REL,
451                                      &handle_datastore_reply, sc);
452   if (NULL == sc->qe)
453   {
454     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455                 "Queueing request with datastore failed (queue full?)\n");
456     continue_writing (sc);
457   }
458   return GNUNET_OK;
459 }
460
461
462 /**
463  * Functions of this type are called upon new cadet connection from other peers.
464  *
465  * @param cls the closure from GNUNET_CADET_connect
466  * @param channel the channel representing the cadet
467  * @param initiator the identity of the peer who wants to establish a cadet
468  *            with us; NULL on binding error
469  * @param port cadet port used for the incoming connection
470  * @param options channel option flags
471  * @return initial channel context (our 'struct CadetClient')
472  */
473 static void *
474 accept_cb (void *cls,
475            struct GNUNET_CADET_Channel *channel,
476            const struct GNUNET_PeerIdentity *initiator,
477            uint32_t port, enum GNUNET_CADET_ChannelOption options)
478 {
479   struct CadetClient *sc;
480
481   GNUNET_assert (NULL != channel);
482   if (sc_count >= sc_count_max)
483   {
484     GNUNET_STATISTICS_update (GSF_stats,
485                               gettext_noop ("# cadet client connections rejected"), 1,
486                               GNUNET_NO);
487     GNUNET_CADET_channel_destroy (channel);
488     return NULL;
489   }
490   GNUNET_STATISTICS_update (GSF_stats,
491                             gettext_noop ("# cadet connections active"), 1,
492                             GNUNET_NO);
493   sc = GNUNET_new (struct CadetClient);
494   sc->channel = channel;
495   GNUNET_CONTAINER_DLL_insert (sc_head,
496                                sc_tail,
497                                sc);
498   sc_count++;
499   refresh_timeout_task (sc);
500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501               "Accepting inbound cadet connection from `%s' as client %p\n",
502               GNUNET_i2s (initiator),
503               sc);
504   return sc;
505 }
506
507
508 /**
509  * Function called by cadet when a client disconnects.
510  * Cleans up our 'struct CadetClient' of that channel.
511  *
512  * @param cls NULL
513  * @param channel channel of the disconnecting client
514  * @param channel_ctx our 'struct CadetClient'
515  */
516 static void
517 cleaner_cb (void *cls,
518             const struct GNUNET_CADET_Channel *channel,
519             void *channel_ctx)
520 {
521   struct CadetClient *sc = channel_ctx;
522   struct WriteQueueItem *wqi;
523
524   if (NULL == sc)
525     return;
526   sc->channel = NULL;
527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
528               "Terminating cadet connection with client %p\n",
529               sc);
530   GNUNET_STATISTICS_update (GSF_stats,
531                             gettext_noop ("# cadet connections active"), -1,
532                             GNUNET_NO);
533   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
534     GNUNET_SCHEDULER_cancel (sc->terminate_task);
535   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
536     GNUNET_SCHEDULER_cancel (sc->timeout_task);
537   if (NULL != sc->wh)
538     GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
539   if (NULL != sc->qe)
540     GNUNET_DATASTORE_cancel (sc->qe);
541   while (NULL != (wqi = sc->wqi_head))
542   {
543     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
544                                  sc->wqi_tail,
545                                  wqi);
546     GNUNET_free (wqi);
547   }
548   GNUNET_CONTAINER_DLL_remove (sc_head,
549                                sc_tail,
550                                sc);
551   sc_count--;
552   GNUNET_free (sc);
553 }
554
555
556 /**
557  * Initialize subsystem for non-anonymous file-sharing.
558  */
559 void
560 GSF_cadet_start_server ()
561 {
562   static const struct GNUNET_CADET_MessageHandler handlers[] = {
563     { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)},
564     { NULL, 0, 0 }
565   };
566   static const uint32_t ports[] = {
567     GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
568     0
569   };
570
571   if (GNUNET_YES !=
572       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
573                                              "fs",
574                                              "MAX_CADET_CLIENTS",
575                                              &sc_count_max))
576     return;
577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578               "Initializing cadet FS server with a limit of %llu connections\n",
579               sc_count_max);
580   listen_channel = GNUNET_CADET_connect (GSF_cfg,
581                                        NULL,
582                                        &accept_cb,
583                                        &cleaner_cb,
584                                        handlers,
585                                        ports);
586 }
587
588
589 /**
590  * Shutdown subsystem for non-anonymous file-sharing.
591  */
592 void
593 GSF_cadet_stop_server ()
594 {
595   if (NULL != listen_channel)
596   {
597     GNUNET_CADET_disconnect (listen_channel);
598     listen_channel = NULL;
599   }
600   GNUNET_assert (NULL == sc_head);
601   GNUNET_assert (0 == sc_count);
602 }
603
604 /* end of gnunet-service-fs_cadet.c */