303c370db1c5037a563c0ec01e6149345ea8183f
[oweals/gnunet.git] / src / util / server.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file util/server.c
23  * @brief library for building GNUnet network servers
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix inefficient memmove in message processing
28  */
29
30 #include "platform.h"
31 #include "gnunet_common.h"
32 #include "gnunet_connection_lib.h"
33 #include "gnunet_scheduler_lib.h"
34 #include "gnunet_server_lib.h"
35 #include "gnunet_time_lib.h"
36 #include "gnunet_disk_lib.h"
37
38 #define DEBUG_SERVER GNUNET_NO
39
40 /**
41  * List of arrays of message handlers.
42  */
43 struct HandlerList
44 {
45   /**
46    * This is a linked list.
47    */
48   struct HandlerList *next;
49
50   /**
51    * NULL-terminated array of handlers.
52    */
53   const struct GNUNET_SERVER_MessageHandler *handlers;
54 };
55
56
57 /**
58  * List of arrays of message handlers.
59  */
60 struct NotifyList
61 {
62   /**
63    * This is a linked list.
64    */
65   struct NotifyList *next;
66
67   /**
68    * Function to call.
69    */
70   GNUNET_SERVER_DisconnectCallback callback;
71
72   /**
73    * Closure for callback.
74    */
75   void *callback_cls;
76 };
77
78
79 /**
80  * @brief handle for a server
81  */
82 struct GNUNET_SERVER_Handle
83 {
84   /**
85    * My scheduler.
86    */
87   struct GNUNET_SCHEDULER_Handle *sched;
88
89   /**
90    * List of handlers for incoming messages.
91    */
92   struct HandlerList *handlers;
93
94   /**
95    * List of our current clients.
96    */
97   struct GNUNET_SERVER_Client *clients;
98
99   /**
100    * Linked list of functions to call on disconnects by clients.
101    */
102   struct NotifyList *disconnect_notify_list;
103
104   /**
105    * Function to call for access control.
106    */
107   GNUNET_CONNECTION_AccessCheck access;
108
109   /**
110    * Closure for access.
111    */
112   void *access_cls;
113
114   /**
115    * After how long should an idle connection time
116    * out (on write).
117    */
118   struct GNUNET_TIME_Relative idle_timeout;
119
120   /**
121    * maximum write buffer size for accepted sockets
122    */
123   size_t maxbuf;
124
125   /**
126    * Pipe used to signal shutdown of the server.
127    */
128   struct GNUNET_DISK_PipeHandle *shutpipe;
129
130   /**
131    * Socket used to listen for new connections.  Set to
132    * "-1" by GNUNET_SERVER_destroy to initiate shutdown.
133    */
134   struct GNUNET_NETWORK_Handle *listen_socket;
135
136   /**
137    * Set to GNUNET_YES if we are shutting down.
138    */
139   int do_shutdown;
140
141   /**
142    * Do we ignore messages of types that we do not
143    * understand or do we require that a handler
144    * is found (and if not kill the connection)?
145    */
146   int require_found;
147
148 };
149
150
151 /**
152  * @brief handle for a client of the server
153  */
154 struct GNUNET_SERVER_Client
155 {
156
157   /**
158    * Size of the buffer for incoming data.  Should be
159    * first so we get nice alignment.
160    */
161   char incoming_buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE];
162
163   /**
164    * This is a linked list.
165    */
166   struct GNUNET_SERVER_Client *next;
167
168   /**
169    * Server that this client belongs to.
170    */
171   struct GNUNET_SERVER_Handle *server;
172
173   /**
174    * Client closure for callbacks.
175    */
176   void *client_closure;
177
178   /**
179    * Callback to receive from client.
180    */
181   GNUNET_SERVER_ReceiveCallback receive;
182
183   /**
184    * Callback to cancel receive from client.
185    */
186   GNUNET_SERVER_ReceiveCancelCallback receive_cancel;
187
188   /**
189    * Callback to ask about transmit-ready notification.
190    */
191   GNUNET_SERVER_TransmitReadyCallback notify_transmit_ready;
192
193    /**
194    * Callback to ask about transmit-ready notification.
195    */
196   GNUNET_SERVER_TransmitReadyCancelCallback notify_transmit_ready_cancel;
197
198   /**
199    * Callback to check if client is still valid.
200    */
201   GNUNET_SERVER_CheckCallback check;
202
203   /**
204    * Callback to destroy client.
205    */
206   GNUNET_SERVER_DestroyCallback destroy;
207
208   /**
209    * Side-buffer for incoming data used when processing
210    * is suspended.
211    */
212   char *side_buf;
213
214   /**
215    * Number of bytes in the side buffer.
216    */
217   size_t side_buf_size;
218
219   /**
220    * Last activity on this socket (used to time it out
221    * if reference_count == 0).
222    */
223   struct GNUNET_TIME_Absolute last_activity;
224
225   /**
226    * How many bytes in the "incoming_buffer" are currently
227    * valid? (starting at offset 0).
228    */
229   size_t receive_pos;
230
231   /**
232    * Number of external entities with a reference to
233    * this client object.
234    */
235   unsigned int reference_count;
236
237   /**
238    * Was processing if incoming messages suspended while
239    * we were still processing data already received?
240    * This is a counter saying how often processing was
241    * suspended (once per handler invoked).
242    */
243   unsigned int suspended;
244
245   /**
246    * Are we currently in the "process_client_buffer" function (and
247    * will hence restart the receive job on exit if suspended == 0 once
248    * we are done?).  If this is set, then "receive_done" will
249    * essentially only decrement suspended; if this is not set, then
250    * "receive_done" may need to restart the receive process (either
251    * from the side-buffer or via select/recv).
252    */
253   int in_process_client_buffer;
254
255   /**
256    * We're about to close down this client due to some serious
257    * error.
258    */
259   int shutdown_now;
260
261 };
262
263
264 /**
265  * Server has been asked to shutdown, free resources.
266  */
267 static void
268 destroy_server (struct GNUNET_SERVER_Handle *server)
269 {
270   struct GNUNET_SERVER_Client *pos;
271   struct HandlerList *hpos;
272   struct NotifyList *npos;
273
274 #if DEBUG_SERVER
275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
276               "Server shutting down.\n");
277 #endif
278   GNUNET_assert (server->listen_socket == NULL);
279   if (GNUNET_OK != GNUNET_DISK_pipe_close (server->shutpipe))
280     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
281                          "pipe-close");
282   while (server->clients != NULL)
283     {
284       pos = server->clients;
285       server->clients = pos->next;
286       pos->server = NULL;
287     }
288   while (NULL != (hpos = server->handlers))
289     {
290       server->handlers = hpos->next;
291       GNUNET_free (hpos);
292     }
293   while (NULL != (npos = server->disconnect_notify_list))
294     {
295       server->disconnect_notify_list = npos->next;
296       GNUNET_free (npos);
297     }
298   GNUNET_free (server);
299 }
300
301
302 /**
303  * Scheduler says our listen socket is ready.
304  * Process it!
305  */
306 static void
307 process_listen_socket (void *cls,
308                        const struct GNUNET_SCHEDULER_TaskContext *tc)
309 {
310   struct GNUNET_SERVER_Handle *server = cls;
311   struct GNUNET_CONNECTION_Handle *sock;
312   struct GNUNET_SERVER_Client *client;
313   struct GNUNET_NETWORK_FDSet *r;
314   const struct GNUNET_DISK_FileHandle *shutpipe;
315
316   if ((server->do_shutdown) ||
317       ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0))
318     {
319       /* shutdown was initiated */
320       GNUNET_assert (server->listen_socket != NULL);
321       GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (server->listen_socket));
322       server->listen_socket = NULL;
323       if (server->do_shutdown)
324         destroy_server (server);
325       return;
326     }
327   shutpipe = GNUNET_DISK_pipe_handle (server->shutpipe, 0);
328   GNUNET_assert (GNUNET_NETWORK_fdset_isset (tc->read_ready, server->listen_socket));
329   GNUNET_assert (!GNUNET_NETWORK_fdset_handle_isset (tc->read_ready, shutpipe));
330   sock = GNUNET_CONNECTION_create_from_accept (tc->sched,
331                                                    server->access,
332                                                    server->access_cls,
333                                                    server->listen_socket,
334                                                    server->maxbuf);
335   if (sock != NULL)
336     {
337 #if DEBUG_SERVER
338       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
339                   "Server accepted incoming connection.\n");
340 #endif
341       client = GNUNET_SERVER_connect_socket (server, sock);
342       /* decrement reference count, we don't keep "client" alive */
343       GNUNET_SERVER_client_drop (client);
344     }
345   /* listen for more! */
346   r = GNUNET_NETWORK_fdset_create ();
347   GNUNET_NETWORK_fdset_set (r, server->listen_socket);
348   GNUNET_NETWORK_fdset_handle_set (r, shutpipe);
349   GNUNET_SCHEDULER_add_select (server->sched,
350                                GNUNET_YES,
351                                GNUNET_SCHEDULER_PRIORITY_HIGH,
352                                GNUNET_SCHEDULER_NO_TASK,
353                                GNUNET_TIME_UNIT_FOREVER_REL,
354                                r, NULL,
355                                &process_listen_socket, server);
356   GNUNET_NETWORK_fdset_destroy (r);
357 }
358
359
360 /**
361  * Create and initialize a listen socket for the server.
362  *
363  * @return NULL on error, otherwise the listen socket
364  */
365 static struct GNUNET_NETWORK_Handle *
366 open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
367 {
368   const static int on = 1;
369   struct GNUNET_NETWORK_Handle *sock;
370   uint16_t port;
371
372   switch (serverAddr->sa_family)
373     {
374     case AF_INET:
375       port = ntohs (((const struct sockaddr_in *) serverAddr)->sin_port);
376       break;
377     case AF_INET6:
378       port = ntohs (((const struct sockaddr_in6 *) serverAddr)->sin6_port);
379       break;
380     default:
381       GNUNET_break (0);
382       return NULL;
383     }
384   sock = GNUNET_NETWORK_socket_socket (serverAddr->sa_family, SOCK_STREAM, 0);
385   if (NULL == sock)
386     {
387       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
388       return NULL;
389     }
390 #ifndef MINGW
391   if (GNUNET_OK != GNUNET_NETWORK_socket_set_inheritable (sock))
392     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
393                          "fcntl");
394 #endif
395   if (GNUNET_NETWORK_socket_setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) != GNUNET_OK)
396     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
397                          "setsockopt");
398   /* bind the socket */
399   if (GNUNET_NETWORK_socket_bind (sock, serverAddr, socklen) != GNUNET_OK)
400     {
401       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind");
402       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
403                   _
404                   ("`%s' failed for port %d. Is the service already running?\n"),
405                   "bind", port);
406       GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
407       return NULL;
408     }
409   if (GNUNET_OK != GNUNET_NETWORK_socket_listen (sock, 5))
410     {
411       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "listen");
412       GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
413       return NULL;
414     }
415 #if DEBUG_SERVER
416       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417                   "Server starts to listen on port %u.\n",
418                   port);
419 #endif
420   return sock;
421 }
422
423
424 /**
425  * Create a new server.
426  *
427  * @param sched scheduler to use
428  * @param access function for access control
429  * @param access_cls closure for access
430  * @param serverAddr address to listen on (including port), use NULL
431  *        for internal server (no listening)
432  * @param socklen length of serverAddr
433  * @param maxbuf maximum write buffer size for accepted sockets
434  * @param idle_timeout after how long should we timeout idle connections?
435  * @param require_found if YES, connections sending messages of unknown type
436  *        will be closed
437  * @return handle for the new server, NULL on error
438  *         (typically, "port" already in use)
439  */
440 struct GNUNET_SERVER_Handle *
441 GNUNET_SERVER_create (struct GNUNET_SCHEDULER_Handle *sched,
442                       GNUNET_CONNECTION_AccessCheck access,
443                       void *access_cls,
444                       const struct sockaddr *serverAddr,
445                       socklen_t socklen,
446                       size_t maxbuf,
447                       struct GNUNET_TIME_Relative
448                       idle_timeout, int require_found)
449 {
450   struct GNUNET_SERVER_Handle *ret;
451   struct GNUNET_NETWORK_Handle *lsock;
452   struct GNUNET_NETWORK_FDSet *r;
453
454   lsock = NULL;
455   if (serverAddr != NULL)
456     {
457       lsock = open_listen_socket (serverAddr, socklen);
458       if (lsock == NULL)
459         return NULL;
460     }
461   ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Handle));
462   ret->shutpipe = GNUNET_malloc (sizeof (struct GNUNET_DISK_FileDescriptor *[2]));
463   if (NULL == (ret->shutpipe = GNUNET_DISK_pipe (GNUNET_NO)))
464     {
465       GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (lsock));
466       GNUNET_free (ret->shutpipe);
467       GNUNET_free (ret);
468       return NULL;
469     }
470   ret->sched = sched;
471   ret->maxbuf = maxbuf;
472   ret->idle_timeout = idle_timeout;
473   ret->listen_socket = lsock;
474   ret->access = access;
475   ret->access_cls = access_cls;
476   ret->require_found = require_found;
477   if (lsock != NULL)
478     {
479       r = GNUNET_NETWORK_fdset_create ();
480       GNUNET_NETWORK_fdset_set (r, ret->listen_socket);
481       GNUNET_NETWORK_fdset_handle_set (r, GNUNET_DISK_pipe_handle (ret->shutpipe, 0));
482       GNUNET_SCHEDULER_add_select (sched,
483                                    GNUNET_YES,
484                                    GNUNET_SCHEDULER_PRIORITY_HIGH,
485                                    GNUNET_SCHEDULER_NO_TASK,
486                                    GNUNET_TIME_UNIT_FOREVER_REL,
487                                    r,
488                                    NULL, &process_listen_socket, ret);
489       GNUNET_NETWORK_fdset_destroy (r);
490     }
491   return ret;
492 }
493
494
495 /**
496  * Free resources held by this server.
497  */
498 void
499 GNUNET_SERVER_destroy (struct GNUNET_SERVER_Handle *s)
500 {
501   static char c;
502
503   GNUNET_assert (s->do_shutdown == GNUNET_NO);
504   s->do_shutdown = GNUNET_YES;
505   if (s->listen_socket == NULL)
506     destroy_server (s);
507   else
508     GNUNET_break (1 == GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle (s->shutpipe, 1), &c, 1));
509 }
510
511
512 /**
513  * Add additional handlers to an existing server.
514  *
515  * @param server the server to add handlers to
516  * @param handlers array of message handlers for
517  *        incoming messages; the last entry must
518  *        have "NULL" for the "callback"; multiple
519  *        entries for the same type are allowed,
520  *        they will be called in order of occurence.
521  *        These handlers can be removed later;
522  *        the handlers array must exist until removed
523  *        (or server is destroyed).
524  */
525 void
526 GNUNET_SERVER_add_handlers (struct GNUNET_SERVER_Handle *server,
527                             const struct GNUNET_SERVER_MessageHandler
528                             *handlers)
529 {
530   struct HandlerList *p;
531
532   p = GNUNET_malloc (sizeof (struct HandlerList));
533   p->handlers = handlers;
534   p->next = server->handlers;
535   server->handlers = p;
536 }
537
538
539 /**
540  * Inject a message into the server, pretend it came
541  * from the specified client.  Delivery of the message
542  * will happen instantly (if a handler is installed;
543  * otherwise the call does nothing).
544  *
545  * @param server the server receiving the message
546  * @param sender the "pretended" sender of the message
547  *        can be NULL!
548  * @param message message to transmit
549  * @return GNUNET_OK if the message was OK and the
550  *                   connection can stay open
551  *         GNUNET_SYSERR if the connection to the
552  *         client should be shut down
553  */
554 int
555 GNUNET_SERVER_inject (struct GNUNET_SERVER_Handle *server,
556                       struct GNUNET_SERVER_Client *sender,
557                       const struct GNUNET_MessageHeader *message)
558 {
559   struct HandlerList *pos;
560   const struct GNUNET_SERVER_MessageHandler *mh;
561   unsigned int i;
562   uint16_t type;
563   uint16_t size;
564   int found;
565
566   type = ntohs (message->type);
567   size = ntohs (message->size);
568 #if DEBUG_SERVER
569   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570               "Server schedules transmission of %u-byte message of type %u to client.\n",
571               size,
572               type);
573 #endif
574   pos = server->handlers;
575   found = GNUNET_NO;
576   while (pos != NULL)
577     {
578       i = 0;
579       while (pos->handlers[i].callback != NULL)
580         {
581           mh = &pos->handlers[i];
582           if (mh->type == type)
583             {
584               if ((mh->expected_size != 0) && (mh->expected_size != size))
585                 {
586                   GNUNET_break_op (0);
587                   return GNUNET_SYSERR;
588                 }
589               if (sender != NULL)
590                 sender->suspended++;
591               mh->callback (mh->callback_cls, sender, message);
592               found = GNUNET_YES;
593             }
594           i++;
595         }
596       pos = pos->next;
597     }
598   if (found == GNUNET_NO)
599     {
600       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
601                   _("Received message of unknown type %d\n"), type);
602       if (server->require_found == GNUNET_YES)
603         return GNUNET_SYSERR;
604     }
605   return GNUNET_OK;
606 }
607
608
609 /**
610  * We're finished with this client and especially its input
611  * processing.  If the RC is zero, free all resources otherwise wait
612  * until RC hits zero to do so.
613  */
614 static void
615 shutdown_incoming_processing (struct GNUNET_SERVER_Client *client)
616 {
617   struct GNUNET_SERVER_Client *prev;
618   struct GNUNET_SERVER_Client *pos;
619   struct GNUNET_SERVER_Handle *server;
620   struct NotifyList *n;
621   unsigned int rc;
622
623   rc = client->reference_count;
624   if (client->server != NULL)
625     {
626       server = client->server;
627       client->server = NULL;
628       prev = NULL;
629       pos = server->clients;
630       while ((pos != NULL) && (pos != client))
631         {
632           prev = pos;
633           pos = pos->next;
634         }
635       GNUNET_assert (pos != NULL);
636       if (prev == NULL)
637         server->clients = pos->next;
638       else
639         prev->next = pos->next;
640       n = server->disconnect_notify_list;
641       while (n != NULL)
642         {
643           n->callback (n->callback_cls, client);
644           n = n->next;
645         }
646     }
647   /* wait for RC to hit zero, then free */
648   if (rc > 0)
649     return;
650   client->destroy (client->client_closure);
651   GNUNET_free (client);
652 }
653
654
655 /**
656  * Go over the contents of the client buffer; as long as full messages
657  * are available, pass them on for processing.  Update the buffer
658  * accordingly.  Handles fatal errors by shutting down the connection.
659  *
660  * @param client identifies which client receive buffer to process
661  */
662 static void
663 process_client_buffer (struct GNUNET_SERVER_Client *client)
664 {
665   struct GNUNET_SERVER_Handle *server;
666   const struct GNUNET_MessageHeader *hdr;
667   size_t msize;
668
669   client->in_process_client_buffer = GNUNET_YES;
670   server = client->server;
671 #if DEBUG_SERVER
672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673               "Private buffer contains %u bytes; client is %s and we are %s\n",
674               client->receive_pos,
675               client->suspended ? "suspended" : "up",
676               client->shutdown_now ? "in shutdown" : "running");
677 #endif
678   while ((client->receive_pos >= sizeof (struct GNUNET_MessageHeader)) &&
679          (0 == client->suspended) && (GNUNET_YES != client->shutdown_now))
680     {
681       hdr = (const struct GNUNET_MessageHeader *) &client->incoming_buffer;
682       msize = ntohs (hdr->size);
683       if (msize > client->receive_pos)
684         {
685 #if DEBUG_SERVER
686           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687                       "Total message size is %u, we only have %u bytes; need more data\n",
688                       msize,
689                       client->receive_pos);
690 #endif
691           break;
692         }
693 #if DEBUG_SERVER
694       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695                   "Passing %u bytes to callback for processing\n",
696                   msize);
697 #endif
698       if ((msize < sizeof (struct GNUNET_MessageHeader)) ||
699           (GNUNET_OK != GNUNET_SERVER_inject (server, client, hdr)))
700         {
701           client->in_process_client_buffer = GNUNET_NO;
702           shutdown_incoming_processing (client);
703           return;
704         }
705       /* FIXME: this is highly inefficient; we should
706          try to avoid this if the new base address is
707          already nicely aligned.  See old handler code... */
708       memmove (client->incoming_buffer,
709                &client->incoming_buffer[msize], client->receive_pos - msize);
710       client->receive_pos -= msize;
711     }
712   client->in_process_client_buffer = GNUNET_NO;
713   if (GNUNET_YES == client->shutdown_now)
714     shutdown_incoming_processing (client);
715 }
716
717
718 /**
719  * We are receiving an incoming message.  Process it.
720  *
721  * @param cls our closure (handle for the client)
722  * @param buf buffer with data received from network
723  * @param available number of bytes available in buf
724  * @param addr address of the sender
725  * @param addrlen length of addr
726  * @param errCode code indicating errors receiving, 0 for success
727  */
728 static void
729 process_incoming (void *cls,
730                   const void *buf,
731                   size_t available,
732                   const struct sockaddr *addr, 
733                   socklen_t addrlen,
734                   int errCode)
735 {
736   struct GNUNET_SERVER_Client *client = cls;
737   struct GNUNET_SERVER_Handle *server = client->server;
738   const char *cbuf = buf;
739   size_t maxcpy;
740
741   if ((buf == NULL) ||
742       (available == 0) ||
743       (errCode != 0) ||
744       (server == NULL) ||
745       (client->shutdown_now == GNUNET_YES) ||
746       (GNUNET_YES != client->check (client->client_closure)))
747     {
748       /* other side closed connection, error connecting, etc. */
749       shutdown_incoming_processing (client);
750       return;
751     }
752 #if DEBUG_SERVER
753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
754               "Server receives %u bytes from `%s'.\n",
755               available,
756               GNUNET_a2s(addr, addrlen));
757 #endif
758   GNUNET_SERVER_client_keep (client);
759   client->last_activity = GNUNET_TIME_absolute_get ();
760   /* process data (if available) */
761   while (available > 0)
762     {
763       maxcpy = available;
764       if (maxcpy > sizeof (client->incoming_buffer) - client->receive_pos)
765         maxcpy = sizeof (client->incoming_buffer) - client->receive_pos;
766 #if DEBUG_SERVER
767       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
768                   "Can copy %u bytes to private buffer\n",
769                   maxcpy);
770 #endif
771       memcpy (&client->incoming_buffer[client->receive_pos], cbuf, maxcpy);
772       client->receive_pos += maxcpy;
773       cbuf += maxcpy;
774       available -= maxcpy;
775       if (0 < client->suspended)
776         {
777           if (available > 0)
778             {
779 #if DEBUG_SERVER
780               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781                           "Client has suspended processing; copying %u bytes to side buffer to be used later.\n",
782                           available);
783 #endif
784               GNUNET_assert (client->side_buf_size == 0);
785               GNUNET_assert (client->side_buf == NULL);
786               client->side_buf_size = available;
787               client->side_buf = GNUNET_malloc (available);
788               memcpy (client->side_buf, cbuf, available);
789               available = 0;
790             }
791           break;                /* do not run next client iteration! */
792         }
793 #if DEBUG_SERVER
794       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795                   "Now processing messages in private buffer\n");
796 #endif
797       process_client_buffer (client);
798     }
799   GNUNET_assert (available == 0);
800   if ((client->suspended == 0) &&
801       (GNUNET_YES != client->shutdown_now) && (client->server != NULL))
802     {
803       /* Finally, keep receiving! */
804       client->receive (client->client_closure,
805                        GNUNET_SERVER_MAX_MESSAGE_SIZE,
806                        server->idle_timeout,
807                        &process_incoming, client);
808     }
809   if (GNUNET_YES == client->shutdown_now)
810     shutdown_incoming_processing (client);
811   GNUNET_SERVER_client_drop (client);
812 }
813
814
815 /**
816  * FIXME: document.
817  */
818 static void
819 restart_processing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
820 {
821   struct GNUNET_SERVER_Client *client = cls;
822
823   process_client_buffer (client);
824   if (0 == client->suspended)
825     client->receive (client->client_closure,
826                      GNUNET_SERVER_MAX_MESSAGE_SIZE,
827                      client->server->idle_timeout,
828                      &process_incoming, client);
829 }
830
831
832 /**
833  * Add a client to the set of our clients and
834  * start receiving.
835  */
836 static void
837 add_client (struct GNUNET_SERVER_Handle *server,
838             struct GNUNET_SERVER_Client *client)
839 {
840   client->server = server;
841   client->last_activity = GNUNET_TIME_absolute_get ();
842   client->next = server->clients;
843   server->clients = client;
844   client->receive (client->client_closure,
845                    GNUNET_SERVER_MAX_MESSAGE_SIZE,
846                    server->idle_timeout,
847                    &process_incoming, client);
848 }
849
850
851 /**
852  * Create a request for receiving data from a socket.
853  *
854  * @param cls identifies the socket to receive from
855  * @param max how much data to read at most
856  * @param timeout when should this operation time out
857  * @param receiver function to call for processing
858  * @param receiver_cls closure for receiver
859  */
860 static void
861 sock_receive (void *cls,
862               size_t max,
863               struct GNUNET_TIME_Relative timeout,
864               GNUNET_CONNECTION_Receiver receiver, void *receiver_cls)
865 {
866   GNUNET_CONNECTION_receive (cls, max, timeout, receiver, receiver_cls);
867 }
868
869
870 /**
871  * Wrapper to cancel receiving from a socket.
872  * 
873  * @param cls handle to the GNUNET_CONNECTION_Handle to cancel
874  */
875 static void
876 sock_receive_cancel (void *cls)
877 {
878   GNUNET_CONNECTION_receive_cancel (cls);
879 }
880
881
882 /**
883  * FIXME: document.
884  */
885 static void *
886 sock_notify_transmit_ready (void *cls,
887                             size_t size,
888                             struct GNUNET_TIME_Relative timeout,
889                             GNUNET_CONNECTION_TransmitReadyNotify notify,
890                             void *notify_cls)
891 {
892   return GNUNET_CONNECTION_notify_transmit_ready (cls, size, timeout, notify,
893                                                notify_cls);
894 }
895
896
897 /**
898  * FIXME: document.
899  */
900 static void
901 sock_notify_transmit_ready_cancel (void *cls, void *h)
902 {
903   GNUNET_CONNECTION_notify_transmit_ready_cancel (h);
904 }
905
906
907 /**
908  * Check if socket is still valid (no fatal errors have happened so far).
909  *
910  * @param cls the socket
911  * @return GNUNET_YES if valid, GNUNET_NO otherwise
912  */
913 static int
914 sock_check (void *cls)
915 {
916   return GNUNET_CONNECTION_check (cls);
917 }
918
919
920 /**
921  * Destroy this socket (free resources).
922  *
923  * @param cls the socket
924  */
925 static void
926 sock_destroy (void *cls)
927 {
928   GNUNET_CONNECTION_destroy (cls);
929 }
930
931
932 /**
933  * Add a TCP socket-based connection to the set of handles managed by
934  * this server.  Use this function for outgoing (P2P) connections that
935  * we initiated (and where this server should process incoming
936  * messages).
937  *
938  * @param server the server to use
939  * @param connection the connection to manage (client must
940  *        stop using this connection from now on)
941  * @return the client handle (client should call
942  *         "client_drop" on the return value eventually)
943  */
944 struct GNUNET_SERVER_Client *
945 GNUNET_SERVER_connect_socket (struct
946                               GNUNET_SERVER_Handle
947                               *server,
948                               struct GNUNET_CONNECTION_Handle *connection)
949 {
950   struct GNUNET_SERVER_Client *client;
951
952   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
953   client->client_closure = connection;
954   client->receive = &sock_receive;
955   client->receive_cancel = &sock_receive_cancel;
956   client->notify_transmit_ready = &sock_notify_transmit_ready;
957   client->notify_transmit_ready_cancel = &sock_notify_transmit_ready_cancel;
958   client->check = &sock_check;
959   client->destroy = &sock_destroy;
960   client->reference_count = 1;
961   add_client (server, client);
962   return client;
963 }
964
965
966 /**
967  * Add an arbitrary connection to the set of handles managed by this
968  * server.  This can be used if a sending and receiving does not
969  * really go over the network (internal transmission) or for servers
970  * using UDP.
971  *
972  * @param server the server to use
973  * @param chandle opaque handle for the connection
974  * @param creceive receive function for the connection
975  * @param ccancel cancel receive function for the connection
976  * @param cnotify transmit notification function for the connection
977  * @param cnotify_cancel transmit notification cancellation function for the connection
978  * @param ccheck function to test if the connection is still up
979  * @param cdestroy function to close and free the connection
980  * @return the client handle (client should call
981  *         "client_drop" on the return value eventually)
982  */
983 struct GNUNET_SERVER_Client *
984 GNUNET_SERVER_connect_callback (struct
985                                 GNUNET_SERVER_Handle
986                                 *server,
987                                 void *chandle,
988                                 GNUNET_SERVER_ReceiveCallback
989                                 creceive,
990                                 GNUNET_SERVER_ReceiveCancelCallback
991                                 ccancel,
992                                 GNUNET_SERVER_TransmitReadyCallback
993                                 cnotify,
994                                 GNUNET_SERVER_TransmitReadyCancelCallback
995                                 cnotify_cancel,
996                                 GNUNET_SERVER_CheckCallback
997                                 ccheck,
998                                 GNUNET_SERVER_DestroyCallback cdestroy)
999 {
1000   struct GNUNET_SERVER_Client *client;
1001
1002   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
1003   client->client_closure = chandle;
1004   client->receive = creceive;
1005   client->receive_cancel = ccancel;
1006   client->notify_transmit_ready = cnotify;
1007   client->notify_transmit_ready_cancel = cnotify_cancel;
1008   client->check = ccheck;
1009   client->destroy = cdestroy;
1010   client->reference_count = 1;
1011   add_client (server, client);
1012   return client;
1013 }
1014
1015
1016 /**
1017  * Notify the server that the given client handle should
1018  * be kept (keeps the connection up if possible, increments
1019  * the internal reference counter).
1020  *
1021  * @param client the client to keep
1022  */
1023 void
1024 GNUNET_SERVER_client_keep (struct GNUNET_SERVER_Client *client)
1025 {
1026   client->reference_count++;
1027 }
1028
1029
1030 /**
1031  * Notify the server that the given client handle is no
1032  * longer required.  Decrements the reference counter.  If
1033  * that counter reaches zero an inactive connection maybe
1034  * closed.
1035  *
1036  * @param client the client to drop
1037  */
1038 void
1039 GNUNET_SERVER_client_drop (struct GNUNET_SERVER_Client *client)
1040 {
1041   GNUNET_assert (client->reference_count > 0);
1042   client->reference_count--;
1043   if ((client->server == NULL) && (client->reference_count == 0))
1044     shutdown_incoming_processing (client);
1045 }
1046
1047
1048 /**
1049  * Obtain the network address of the other party.
1050  *
1051  * @param client the client to get the address for
1052  * @param addr where to store the address
1053  * @param addrlen where to store the length of the address
1054  * @return GNUNET_OK on success
1055  */
1056 int
1057 GNUNET_SERVER_client_get_address (struct GNUNET_SERVER_Client *client,
1058                                   void **addr, size_t * addrlen)
1059 {
1060   if (client->receive != &sock_receive)
1061     return GNUNET_SYSERR;       /* not a network client */
1062   return GNUNET_CONNECTION_get_address (client->client_closure,
1063                                             addr, addrlen);
1064 }
1065
1066
1067 /**
1068  * Ask the server to notify us whenever a client disconnects.
1069  * This function is called whenever the actual network connection
1070  * is closed; the reference count may be zero or larger than zero
1071  * at this point.
1072  *
1073  * @param server the server manageing the clients
1074  * @param callback function to call on disconnect
1075  * @param callback_cls closure for callback
1076  */
1077 void
1078 GNUNET_SERVER_disconnect_notify (struct GNUNET_SERVER_Handle *server,
1079                                  GNUNET_SERVER_DisconnectCallback callback,
1080                                  void *callback_cls)
1081 {
1082   struct NotifyList *n;
1083
1084   n = GNUNET_malloc (sizeof (struct NotifyList));
1085   n->callback = callback;
1086   n->callback_cls = callback_cls;
1087   n->next = server->disconnect_notify_list;
1088   server->disconnect_notify_list = n;
1089 }
1090
1091
1092 /**
1093  * Ask the server to disconnect from the given client.
1094  * This is the same as returning GNUNET_SYSERR from a message
1095  * handler, except that it allows dropping of a client even
1096  * when not handling a message from that client.
1097  *
1098  * @param client the client to disconnect from
1099  */
1100 void
1101 GNUNET_SERVER_client_disconnect (struct GNUNET_SERVER_Client *client)
1102 {
1103   if (client->server == NULL)
1104     return;                     /* already disconnected */
1105   client->receive_cancel (client->client_closure);
1106   shutdown_incoming_processing (client);
1107 }
1108
1109
1110 /**
1111  * Notify us when the server has enough space to transmit
1112  * a message of the given size to the given client.
1113  *
1114  * @param client client to transmit message to
1115  * @param size requested amount of buffer space
1116  * @param timeout after how long should we give up (and call
1117  *        notify with buf NULL and size 0)?
1118  * @param callback function to call when space is available
1119  * @param callback_cls closure for callback
1120  * @return non-NULL if the notify callback was queued; can be used
1121  *           to cancel the request using
1122  *           GNUNET_CONNECTION_notify_transmit_ready_cancel.
1123  *         NULL if we are already going to notify someone else (busy)
1124  */
1125 struct GNUNET_CONNECTION_TransmitHandle *
1126 GNUNET_SERVER_notify_transmit_ready (struct GNUNET_SERVER_Client *client,
1127                                      size_t size,
1128                                      struct GNUNET_TIME_Relative timeout,
1129                                      GNUNET_CONNECTION_TransmitReadyNotify
1130                                      callback, void *callback_cls)
1131 {
1132   return client->notify_transmit_ready (client->client_closure,
1133                                         size,
1134                                         timeout, callback, callback_cls);
1135 }
1136
1137
1138 /**
1139  * Resume receiving from this client, we are done processing the
1140  * current request.  This function must be called from within each
1141  * GNUNET_SERVER_MessageCallback (or its respective continuations).
1142  *
1143  * @param client client we were processing a message of
1144  * @param success GNUNET_OK to keep the connection open and
1145  *                          continue to receive
1146  *                GNUNET_SYSERR to close the connection (signal
1147  *                          serious error)
1148  */
1149 void
1150 GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int success)
1151 {
1152   char *sb;
1153
1154   if (client == NULL)
1155     return;
1156   GNUNET_assert (client->suspended > 0);
1157   client->suspended--;
1158   if (success != GNUNET_OK)
1159     client->shutdown_now = GNUNET_YES;
1160   if (client->suspended > 0)
1161     return;
1162   if (client->in_process_client_buffer == GNUNET_YES)
1163     return;
1164   if (client->side_buf_size > 0)
1165     {
1166       /* resume processing from side-buf */
1167       sb = client->side_buf;
1168       client->side_buf = NULL;
1169       /* this will also resume the receive job */
1170       if (GNUNET_YES != client->shutdown_now)
1171         process_incoming (client, sb, client->side_buf_size, NULL, 0, 0);
1172       else
1173         shutdown_incoming_processing (client);
1174       /* finally, free the side-buf */
1175       GNUNET_free (sb);
1176       return;
1177     }
1178   /* resume receive job */
1179   if (GNUNET_YES != client->shutdown_now)
1180     {
1181       GNUNET_SCHEDULER_add_continuation (client->server->sched,
1182                                          GNUNET_NO,
1183                                          &restart_processing,
1184                                          client,
1185                                          GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1186       return;
1187     }
1188   shutdown_incoming_processing (client);
1189 }
1190
1191
1192 /* end of server.c */