mostly finishing server-side for FS-over-stream
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - add statistics
28  * - limit # concurrent clients, timeout for read
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_stream_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_stream.h"
39
40 /**
41  * Information we keep around for each active streaming client.
42  */
43 struct StreamClient
44 {
45   /**
46    * DLL
47    */ 
48   struct StreamClient *next;
49
50   /**
51    * DLL
52    */ 
53   struct StreamClient *prev;
54
55   /**
56    * Socket for communication.
57    */ 
58   struct GNUNET_STREAM_Socket *socket;
59
60   /**
61    * Handle for active read operation, or NULL.
62    */ 
63   struct GNUNET_STREAM_IOReadHandle *rh;
64
65   /**
66    * Handle for active write operation, or NULL.
67    */ 
68   struct GNUNET_STREAM_IOWriteHandle *wh;
69   
70   /**
71    * Tokenizer for requests.
72    */
73   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
74   
75   /**
76    * Current active request to the datastore, if we have one pending.
77    */
78   struct GNUNET_DATASTORE_QueueEntry *qe;
79
80   /**
81    * Size of the last write that was initiated.
82    */ 
83   size_t reply_size;
84
85 };
86
87
88 /**
89  * Query from one peer, asking the other for CHK-data.
90  */
91 struct StreamQueryMessage
92 {
93
94   /**
95    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
96    */
97   struct GNUNET_MessageHeader header;
98
99   /**
100    * Block type must be DBLOCK or IBLOCK.
101    */
102   uint32_t type;
103
104   /**
105    * Query hash from CHK (hash of encrypted block).
106    */
107   struct GNUNET_HashCode query;
108
109 };
110
111
112 /**
113  * Reply to a StreamQueryMessage.
114  */
115 struct StreamReplyMessage
116 {
117
118   /**
119    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
120    */
121   struct GNUNET_MessageHeader header;
122
123   /**
124    * Block type must be DBLOCK or IBLOCK.
125    */
126   uint32_t type;
127
128   /**
129    * Expiration time for the block.
130    */
131   struct GNUNET_TIME_AbsoluteNBO expiration;
132
133   /* followed by the encrypted block */
134
135 };
136
137
138 /**
139  * Listen socket for incoming requests.
140  */
141 static struct GNUNET_STREAM_ListenSocket *listen_socket;
142
143 /**
144  * Head of DLL of stream clients.
145  */ 
146 static struct StreamClient *sc_head;
147
148 /**
149  * Tail of DLL of stream clients.
150  */ 
151 static struct StreamClient *sc_tail;
152
153
154 /**
155  * We're done with a particular client, clean up.
156  *
157  * @param sc client to clean up
158  */
159 static void
160 terminate_stream (struct StreamClient *sc)
161 {
162   if (NULL != sc->rh)
163     GNUNET_STREAM_io_read_cancel (sc->rh);
164   if (NULL != sc->wh)
165     GNUNET_STREAM_io_write_cancel (sc->wh);
166   if (NULL != sc->qe)
167     GNUNET_DATASTORE_cancel (sc->qe);
168   GNUNET_SERVER_mst_destroy (sc->mst);
169   GNUNET_STREAM_close (sc->socket);
170   GNUNET_CONTAINER_DLL_remove (sc_head,
171                                sc_tail,
172                                sc);
173   GNUNET_free (sc);
174 }
175
176
177 /**
178  * Functions of this signature are called whenever data is available from the
179  * stream.
180  *
181  * @param cls the closure from GNUNET_STREAM_read
182  * @param status the status of the stream at the time this function is called
183  * @param data traffic from the other side
184  * @param size the number of bytes available in data read; will be 0 on timeout 
185  * @return number of bytes of processed from 'data' (any data remaining should be
186  *         given to the next time the read processor is called).
187  */
188 static size_t 
189 process_request (void *cls,
190                  enum GNUNET_STREAM_Status status,
191                  const void *data,
192                  size_t size);
193
194
195 /**
196  * We're done handling a request from a client, read the next one.
197  *
198  * @param sc client to continue reading requests from
199  */
200 static void
201 continue_reading (struct StreamClient *sc)
202 {
203   int ret;
204
205   ret = 
206     GNUNET_SERVER_mst_receive (sc->mst,
207                                NULL,
208                                NULL, 0,
209                                GNUNET_NO, GNUNET_YES);
210   if (GNUNET_NO == ret)
211     return; 
212   sc->rh = GNUNET_STREAM_read (sc->socket,
213                                GNUNET_TIME_UNIT_FOREVER_REL,
214                                &process_request,
215                                sc);      
216 }
217
218
219 /**
220  * Functions of this signature are called whenever data is available from the
221  * stream.
222  *
223  * @param cls the closure from GNUNET_STREAM_read
224  * @param status the status of the stream at the time this function is called
225  * @param data traffic from the other side
226  * @param size the number of bytes available in data read; will be 0 on timeout 
227  * @return number of bytes of processed from 'data' (any data remaining should be
228  *         given to the next time the read processor is called).
229  */
230 static size_t 
231 process_request (void *cls,
232                  enum GNUNET_STREAM_Status status,
233                  const void *data,
234                  size_t size)
235 {
236   struct StreamClient *sc = cls;
237   int ret;
238
239   sc->rh = NULL;
240   switch (status)
241   {
242   case GNUNET_STREAM_OK:
243     ret = 
244       GNUNET_SERVER_mst_receive (sc->mst,
245                                  NULL,
246                                  data, size,
247                                  GNUNET_NO, GNUNET_YES);
248     if (GNUNET_NO == ret)
249       return size; /* more messages in MST */
250     if (GNUNET_SYSERR == ret)
251     {
252       GNUNET_break_op (0);
253       terminate_stream (sc);
254       return size;
255     }
256     break;
257   case GNUNET_STREAM_TIMEOUT:
258   case GNUNET_STREAM_SHUTDOWN:
259   case GNUNET_STREAM_SYSERR:
260   case GNUNET_STREAM_BROKEN:
261     terminate_stream (sc);
262     return size;
263   default:
264     GNUNET_break (0);
265     return size;
266   }
267   continue_reading (sc);
268   return size;
269 }
270
271
272 /**
273  * Sending a reply was completed, continue processing.
274  *
275  * @param cls closure with the struct StreamClient which sent the query
276  */
277 static void
278 write_continuation (void *cls,
279                     enum GNUNET_STREAM_Status status,
280                     size_t size)
281 {
282   struct StreamClient *sc = cls;
283   
284   sc->wh = NULL;
285   if ( (GNUNET_STREAM_OK == status) &&
286        (size == sc->reply_size) )
287     continue_reading (sc);
288   else
289     terminate_stream (sc);    
290 }
291
292
293 /**
294  * Process a datum that was stored in the datastore.
295  *
296  * @param cls closure with the struct StreamClient which sent the query
297  * @param key key for the content
298  * @param size number of bytes in data
299  * @param data content stored
300  * @param type type of the content
301  * @param priority priority of the content
302  * @param anonymity anonymity-level for the content
303  * @param expiration expiration time for the content
304  * @param uid unique identifier for the datum;
305  *        maybe 0 if no unique identifier is available
306  */
307 static void 
308 handle_datastore_reply (void *cls,
309                         const struct GNUNET_HashCode * key,
310                         size_t size, const void *data,
311                         enum GNUNET_BLOCK_Type type,
312                         uint32_t priority,
313                         uint32_t anonymity,
314                         struct GNUNET_TIME_Absolute
315                         expiration, uint64_t uid)
316 {
317   struct StreamClient *sc = cls;
318   size_t msize = size + sizeof (struct StreamReplyMessage);
319   char buf[msize] GNUNET_ALIGN;
320   struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
321
322   sc->qe = NULL;
323   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
324   {
325     if (GNUNET_OK !=
326         GNUNET_FS_handle_on_demand_block (key,
327                                           size, data, type,
328                                           priority, anonymity,
329                                           expiration, uid,
330                                           &handle_datastore_reply,
331                                           sc))
332     {
333       continue_reading (sc);
334     }
335     return;
336   }
337   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
338   {
339     GNUNET_break (0);
340     continue_reading (sc);
341     return;
342   }
343   srm->header.size = htons ((uint16_t) msize);
344   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
345   srm->type = htonl (type);
346   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
347   memcpy (&srm[1], data, size);
348   sc->reply_size = msize;
349   sc->wh = GNUNET_STREAM_write (sc->socket,
350                                 buf, msize,
351                                 GNUNET_TIME_UNIT_FOREVER_REL,
352                                 &write_continuation,
353                                 sc);
354   if (NULL == sc->wh)
355   {
356     terminate_stream (sc);
357     return;
358   }
359 }
360
361
362 /**
363  * Functions with this signature are called whenever a
364  * complete message is received.
365  *
366  * Do not call GNUNET_SERVER_mst_destroy in callback
367  *
368  * @param cls closure with the 'struct StreamClient'
369  * @param client identification of the client, NULL
370  * @param message the actual message
371  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
372  */
373 static int
374 request_cb (void *cls,
375             void *client,
376             const struct GNUNET_MessageHeader *message)
377 {
378   struct StreamClient *sc = cls;
379   const struct StreamQueryMessage *sqm;
380
381   switch (ntohs (message->type))
382   {
383   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
384     if (sizeof (struct StreamQueryMessage) != 
385         ntohs (message->size))
386     {
387       GNUNET_break_op (0);
388       return GNUNET_SYSERR;
389     }
390     sqm = (const struct StreamQueryMessage *) message;
391     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                 "Received query for `%s' via stream\n",
393                 GNUNET_h2s (&sqm->query));
394     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
395                                        0,
396                                        &sqm->query,
397                                        ntohl (sqm->type),
398                                        0 /* FIXME: priority */, 
399                                        GSF_datastore_queue_size,
400                                        GNUNET_TIME_UNIT_FOREVER_REL,
401                                        &handle_datastore_reply, sc);
402     if (NULL == sc->qe)
403       continue_reading (sc);
404     return GNUNET_OK;
405   default:
406     GNUNET_break_op (0);
407     return GNUNET_SYSERR;
408   }
409 }
410
411
412 /**
413  * Functions of this type are called upon new stream connection from other peers
414  * or upon binding error which happen when the app_port given in
415  * GNUNET_STREAM_listen() is already taken.
416  *
417  * @param cls the closure from GNUNET_STREAM_listen
418  * @param socket the socket representing the stream; NULL on binding error
419  * @param initiator the identity of the peer who wants to establish a stream
420  *            with us; NULL on binding error
421  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
422  *             stream (the socket will be invalid after the call)
423  */
424 static int 
425 accept_cb (void *cls,
426            struct GNUNET_STREAM_Socket *socket,
427            const struct GNUNET_PeerIdentity *initiator)
428 {
429   struct StreamClient *sc;
430
431   if (NULL == socket)
432     return GNUNET_SYSERR;
433   sc = GNUNET_malloc (sizeof (struct StreamClient));
434   sc->socket = socket;
435   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
436                                       sc);
437   sc->rh = GNUNET_STREAM_read (sc->socket,
438                                GNUNET_TIME_UNIT_FOREVER_REL,
439                                &process_request,
440                                sc);
441   GNUNET_CONTAINER_DLL_insert (sc_head,
442                                sc_tail,
443                                sc);
444   return GNUNET_OK;
445 }
446
447
448 /**
449  * Initialize subsystem for non-anonymous file-sharing.
450  */
451 void
452 GSF_stream_start ()
453 {
454   listen_socket = GNUNET_STREAM_listen (GSF_cfg,
455                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
456                                         &accept_cb, NULL,
457                                         GNUNET_STREAM_OPTION_END);
458 }
459
460
461 /**
462  * Shutdown subsystem for non-anonymous file-sharing.
463  */
464 void
465 GSF_stream_stop ()
466 {
467   struct StreamClient *sc;
468
469   while (NULL != (sc = sc_head))
470     terminate_stream (sc);
471   if (NULL != listen_socket)
472   {
473     GNUNET_STREAM_listen_close (listen_socket);
474     listen_socket = NULL;
475   }
476 }
477
478 /* end of gnunet-service-fs_stream.c */