ff9c8c192bfddf71b9e8e8acb618e517c7ef23b6
[oweals/gnunet.git] / src / util / server.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file util/server.c
23  * @brief library for building GNUnet network servers
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix inefficient memmove in message processing
28  */
29
30 #include "platform.h"
31 #include "gnunet_common.h"
32 #include "gnunet_connection_lib.h"
33 #include "gnunet_scheduler_lib.h"
34 #include "gnunet_server_lib.h"
35 #include "gnunet_time_lib.h"
36 #include "gnunet_disk_lib.h"
37
38 #define DEBUG_SERVER GNUNET_NO
39
40 /**
41  * List of arrays of message handlers.
42  */
43 struct HandlerList
44 {
45   /**
46    * This is a linked list.
47    */
48   struct HandlerList *next;
49
50   /**
51    * NULL-terminated array of handlers.
52    */
53   const struct GNUNET_SERVER_MessageHandler *handlers;
54 };
55
56
57 /**
58  * List of arrays of message handlers.
59  */
60 struct NotifyList
61 {
62   /**
63    * This is a linked list.
64    */
65   struct NotifyList *next;
66
67   /**
68    * Function to call.
69    */
70   GNUNET_SERVER_DisconnectCallback callback;
71
72   /**
73    * Closure for callback.
74    */
75   void *callback_cls;
76 };
77
78
79 /**
80  * @brief handle for a server
81  */
82 struct GNUNET_SERVER_Handle
83 {
84   /**
85    * My scheduler.
86    */
87   struct GNUNET_SCHEDULER_Handle *sched;
88
89   /**
90    * List of handlers for incoming messages.
91    */
92   struct HandlerList *handlers;
93
94   /**
95    * List of our current clients.
96    */
97   struct GNUNET_SERVER_Client *clients;
98
99   /**
100    * Linked list of functions to call on disconnects by clients.
101    */
102   struct NotifyList *disconnect_notify_list;
103
104   /**
105    * Function to call for access control.
106    */
107   GNUNET_CONNECTION_AccessCheck access;
108
109   /**
110    * Closure for access.
111    */
112   void *access_cls;
113
114   /**
115    * After how long should an idle connection time
116    * out (on write).
117    */
118   struct GNUNET_TIME_Relative idle_timeout;
119
120   /**
121    * maximum write buffer size for accepted sockets
122    */
123   size_t maxbuf;
124
125   /**
126    * Pipe used to signal shutdown of the server.
127    */
128   struct GNUNET_DISK_PipeHandle *shutpipe;
129
130   /**
131    * Socket used to listen for new connections.  Set to
132    * "-1" by GNUNET_SERVER_destroy to initiate shutdown.
133    */
134   struct GNUNET_NETWORK_Handle *listen_socket;
135
136   /**
137    * Set to GNUNET_YES if we are shutting down.
138    */
139   int do_shutdown;
140
141   /**
142    * Do we ignore messages of types that we do not
143    * understand or do we require that a handler
144    * is found (and if not kill the connection)?
145    */
146   int require_found;
147
148 };
149
150
151 /**
152  * @brief handle for a client of the server
153  */
154 struct GNUNET_SERVER_Client
155 {
156
157   /**
158    * Size of the buffer for incoming data.  Should be
159    * first so we get nice alignment.
160    */
161   char incoming_buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE];
162
163   /**
164    * This is a linked list.
165    */
166   struct GNUNET_SERVER_Client *next;
167
168   /**
169    * Server that this client belongs to.
170    */
171   struct GNUNET_SERVER_Handle *server;
172
173   /**
174    * Client closure for callbacks.
175    */
176   void *client_closure;
177
178   /**
179    * Callback to receive from client.
180    */
181   GNUNET_SERVER_ReceiveCallback receive;
182
183   /**
184    * Callback to cancel receive from client.
185    */
186   GNUNET_SERVER_ReceiveCancelCallback receive_cancel;
187
188   /**
189    * Callback to ask about transmit-ready notification.
190    */
191   GNUNET_SERVER_TransmitReadyCallback notify_transmit_ready;
192
193    /**
194    * Callback to ask about transmit-ready notification.
195    */
196   GNUNET_SERVER_TransmitReadyCancelCallback notify_transmit_ready_cancel;
197
198   /**
199    * Callback to check if client is still valid.
200    */
201   GNUNET_SERVER_CheckCallback check;
202
203   /**
204    * Callback to destroy client.
205    */
206   GNUNET_SERVER_DestroyCallback destroy;
207
208   /**
209    * Side-buffer for incoming data used when processing
210    * is suspended.
211    */
212   char *side_buf;
213
214   /**
215    * Number of bytes in the side buffer.
216    */
217   size_t side_buf_size;
218
219   /**
220    * Last activity on this socket (used to time it out
221    * if reference_count == 0).
222    */
223   struct GNUNET_TIME_Absolute last_activity;
224
225   /**
226    * How many bytes in the "incoming_buffer" are currently
227    * valid? (starting at offset 0).
228    */
229   size_t receive_pos;
230
231   /**
232    * Number of external entities with a reference to
233    * this client object.
234    */
235   unsigned int reference_count;
236
237   /**
238    * Was processing if incoming messages suspended while
239    * we were still processing data already received?
240    * This is a counter saying how often processing was
241    * suspended (once per handler invoked).
242    */
243   unsigned int suspended;
244
245   /**
246    * Are we currently in the "process_client_buffer" function (and
247    * will hence restart the receive job on exit if suspended == 0 once
248    * we are done?).  If this is set, then "receive_done" will
249    * essentially only decrement suspended; if this is not set, then
250    * "receive_done" may need to restart the receive process (either
251    * from the side-buffer or via select/recv).
252    */
253   int in_process_client_buffer;
254
255   /**
256    * We're about to close down this client due to some serious
257    * error.
258    */
259   int shutdown_now;
260
261 };
262
263
264 /**
265  * Server has been asked to shutdown, free resources.
266  */
267 static void
268 destroy_server (struct GNUNET_SERVER_Handle *server)
269 {
270   struct GNUNET_SERVER_Client *pos;
271   struct HandlerList *hpos;
272   struct NotifyList *npos;
273
274 #if DEBUG_SERVER
275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server shutting down.\n");
276 #endif
277   GNUNET_assert (server->listen_socket == NULL);
278   if (GNUNET_OK != GNUNET_DISK_pipe_close (server->shutpipe))
279     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "pipe-close");
280   while (server->clients != NULL)
281     {
282       pos = server->clients;
283       server->clients = pos->next;
284       pos->server = NULL;
285     }
286   while (NULL != (hpos = server->handlers))
287     {
288       server->handlers = hpos->next;
289       GNUNET_free (hpos);
290     }
291   while (NULL != (npos = server->disconnect_notify_list))
292     {
293       server->disconnect_notify_list = npos->next;
294       GNUNET_free (npos);
295     }
296   GNUNET_free (server);
297 }
298
299
300 /**
301  * Scheduler says our listen socket is ready.
302  * Process it!
303  */
304 static void
305 process_listen_socket (void *cls,
306                        const struct GNUNET_SCHEDULER_TaskContext *tc)
307 {
308   struct GNUNET_SERVER_Handle *server = cls;
309   struct GNUNET_CONNECTION_Handle *sock;
310   struct GNUNET_SERVER_Client *client;
311   struct GNUNET_NETWORK_FDSet *r;
312   const struct GNUNET_DISK_FileHandle *shutpipe;
313
314   if ((server->do_shutdown) ||
315       ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0))
316     {
317       /* shutdown was initiated */
318       GNUNET_assert (server->listen_socket != NULL);
319       GNUNET_break (GNUNET_OK ==
320                     GNUNET_NETWORK_socket_close (server->listen_socket));
321       server->listen_socket = NULL;
322       if (server->do_shutdown)
323         destroy_server (server);
324       return;
325     }
326   shutpipe = GNUNET_DISK_pipe_handle (server->shutpipe,
327                                       GNUNET_DISK_PIPE_END_READ);
328   GNUNET_assert (GNUNET_NETWORK_fdset_isset
329                  (tc->read_ready, server->listen_socket));
330   GNUNET_assert (!GNUNET_NETWORK_fdset_handle_isset
331                  (tc->read_ready, shutpipe));
332   sock =
333     GNUNET_CONNECTION_create_from_accept (tc->sched, server->access,
334                                           server->access_cls,
335                                           server->listen_socket,
336                                           server->maxbuf);
337   if (sock != NULL)
338     {
339 #if DEBUG_SERVER
340       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341                   "Server accepted incoming connection.\n");
342 #endif
343       client = GNUNET_SERVER_connect_socket (server, sock);
344       /* decrement reference count, we don't keep "client" alive */
345       GNUNET_SERVER_client_drop (client);
346     }
347   /* listen for more! */
348   r = GNUNET_NETWORK_fdset_create ();
349   GNUNET_NETWORK_fdset_set (r, server->listen_socket);
350   GNUNET_NETWORK_fdset_handle_set (r, shutpipe);
351   GNUNET_SCHEDULER_add_select (server->sched,
352                                GNUNET_SCHEDULER_PRIORITY_HIGH,
353                                GNUNET_SCHEDULER_NO_TASK,
354                                GNUNET_TIME_UNIT_FOREVER_REL,
355                                r, NULL, &process_listen_socket, server);
356   GNUNET_NETWORK_fdset_destroy (r);
357 }
358
359
360 /**
361  * Create and initialize a listen socket for the server.
362  *
363  * @return NULL on error, otherwise the listen socket
364  */
365 static struct GNUNET_NETWORK_Handle *
366 open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
367 {
368   const static int on = 1;
369   struct GNUNET_NETWORK_Handle *sock;
370   uint16_t port;
371
372   switch (serverAddr->sa_family)
373     {
374     case AF_INET:
375       port = ntohs (((const struct sockaddr_in *) serverAddr)->sin_port);
376       break;
377     case AF_INET6:
378       port = ntohs (((const struct sockaddr_in6 *) serverAddr)->sin6_port);
379       break;
380     default:
381       GNUNET_break (0);
382       return NULL;
383     }
384   sock = GNUNET_NETWORK_socket_create (serverAddr->sa_family, SOCK_STREAM, 0);
385   if (NULL == sock)
386     {
387       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
388       return NULL;
389     }
390   if (GNUNET_NETWORK_socket_setsockopt
391       (sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) != GNUNET_OK)
392     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
393                          "setsockopt");
394   /* bind the socket */
395   if (GNUNET_NETWORK_socket_bind (sock, serverAddr, socklen) != GNUNET_OK)
396     {
397       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind");
398       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
399                   _
400                   ("`%s' failed for port %d. Is the service already running?\n"),
401                   "bind", port);
402       GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
403       return NULL;
404     }
405   if (GNUNET_OK != GNUNET_NETWORK_socket_listen (sock, 5))
406     {
407       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "listen");
408       GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
409       return NULL;
410     }
411 #if DEBUG_SERVER
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "Server starts to listen on port %u.\n", port);
414 #endif
415   return sock;
416 }
417
418
419 /**
420  * Create a new server.
421  *
422  * @param sched scheduler to use
423  * @param access function for access control
424  * @param access_cls closure for access
425  * @param serverAddr address to listen on (including port), use NULL
426  *        for internal server (no listening)
427  * @param socklen length of serverAddr
428  * @param maxbuf maximum write buffer size for accepted sockets
429  * @param idle_timeout after how long should we timeout idle connections?
430  * @param require_found if YES, connections sending messages of unknown type
431  *        will be closed
432  * @return handle for the new server, NULL on error
433  *         (typically, "port" already in use)
434  */
435 struct GNUNET_SERVER_Handle *
436 GNUNET_SERVER_create (struct GNUNET_SCHEDULER_Handle *sched,
437                       GNUNET_CONNECTION_AccessCheck access,
438                       void *access_cls,
439                       const struct sockaddr *serverAddr,
440                       socklen_t socklen,
441                       size_t maxbuf,
442                       struct GNUNET_TIME_Relative
443                       idle_timeout, int require_found)
444 {
445   struct GNUNET_SERVER_Handle *ret;
446   struct GNUNET_NETWORK_Handle *lsock;
447   struct GNUNET_NETWORK_FDSet *r;
448
449   lsock = NULL;
450   if (serverAddr != NULL)
451     {
452       lsock = open_listen_socket (serverAddr, socklen);
453       if (lsock == NULL)
454         return NULL;
455     }
456   ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Handle));
457   if (NULL == (ret->shutpipe = GNUNET_DISK_pipe (GNUNET_NO)))
458     {
459       GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (lsock));
460       GNUNET_free (ret);
461       return NULL;
462     }
463   ret->sched = sched;
464   ret->maxbuf = maxbuf;
465   ret->idle_timeout = idle_timeout;
466   ret->listen_socket = lsock;
467   ret->access = access;
468   ret->access_cls = access_cls;
469   ret->require_found = require_found;
470   if (lsock != NULL)
471     {
472       r = GNUNET_NETWORK_fdset_create ();
473       GNUNET_NETWORK_fdset_set (r, ret->listen_socket);
474       GNUNET_NETWORK_fdset_handle_set (r,
475                                        GNUNET_DISK_pipe_handle (ret->shutpipe,
476                                                                 GNUNET_DISK_PIPE_END_READ));
477       GNUNET_SCHEDULER_add_select (sched, 
478                                    GNUNET_SCHEDULER_PRIORITY_HIGH,
479                                    GNUNET_SCHEDULER_NO_TASK,
480                                    GNUNET_TIME_UNIT_FOREVER_REL, r, NULL,
481                                    &process_listen_socket, ret);
482       GNUNET_NETWORK_fdset_destroy (r);
483     }
484   return ret;
485 }
486
487
488 /**
489  * Free resources held by this server.
490  */
491 void
492 GNUNET_SERVER_destroy (struct GNUNET_SERVER_Handle *s)
493 {
494   static char c;
495
496   GNUNET_assert (s->do_shutdown == GNUNET_NO);
497   s->do_shutdown = GNUNET_YES;
498   if (s->listen_socket == NULL)
499     destroy_server (s);
500   else
501     GNUNET_break (1 ==
502                   GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
503                                           (s->shutpipe,
504                                            GNUNET_DISK_PIPE_END_WRITE), &c,
505                                           sizeof (c)));
506 }
507
508
509 /**
510  * Add additional handlers to an existing server.
511  *
512  * @param server the server to add handlers to
513  * @param handlers array of message handlers for
514  *        incoming messages; the last entry must
515  *        have "NULL" for the "callback"; multiple
516  *        entries for the same type are allowed,
517  *        they will be called in order of occurence.
518  *        These handlers can be removed later;
519  *        the handlers array must exist until removed
520  *        (or server is destroyed).
521  */
522 void
523 GNUNET_SERVER_add_handlers (struct GNUNET_SERVER_Handle *server,
524                             const struct GNUNET_SERVER_MessageHandler
525                             *handlers)
526 {
527   struct HandlerList *p;
528
529   p = GNUNET_malloc (sizeof (struct HandlerList));
530   p->handlers = handlers;
531   p->next = server->handlers;
532   server->handlers = p;
533 }
534
535
536 /**
537  * Inject a message into the server, pretend it came
538  * from the specified client.  Delivery of the message
539  * will happen instantly (if a handler is installed;
540  * otherwise the call does nothing).
541  *
542  * @param server the server receiving the message
543  * @param sender the "pretended" sender of the message
544  *        can be NULL!
545  * @param message message to transmit
546  * @return GNUNET_OK if the message was OK and the
547  *                   connection can stay open
548  *         GNUNET_SYSERR if the connection to the
549  *         client should be shut down
550  */
551 int
552 GNUNET_SERVER_inject (struct GNUNET_SERVER_Handle *server,
553                       struct GNUNET_SERVER_Client *sender,
554                       const struct GNUNET_MessageHeader *message)
555 {
556   struct HandlerList *pos;
557   const struct GNUNET_SERVER_MessageHandler *mh;
558   unsigned int i;
559   uint16_t type;
560   uint16_t size;
561   int found;
562
563   type = ntohs (message->type);
564   size = ntohs (message->size);
565 #if DEBUG_SERVER
566   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567               "Server schedules transmission of %u-byte message of type %u to client.\n",
568               size, type);
569 #endif
570   pos = server->handlers;
571   found = GNUNET_NO;
572   while (pos != NULL)
573     {
574       i = 0;
575       while (pos->handlers[i].callback != NULL)
576         {
577           mh = &pos->handlers[i];
578           if (mh->type == type)
579             {
580               if ((mh->expected_size != 0) && (mh->expected_size != size))
581                 {
582                   GNUNET_break_op (0);
583                   return GNUNET_SYSERR;
584                 }
585               if (sender != NULL)
586                 sender->suspended++;
587               mh->callback (mh->callback_cls, sender, message);
588               found = GNUNET_YES;
589             }
590           i++;
591         }
592       pos = pos->next;
593     }
594   if (found == GNUNET_NO)
595     {
596       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
597                   _("Received message of unknown type %d\n"), type);
598       if (server->require_found == GNUNET_YES)
599         return GNUNET_SYSERR;
600     }
601   return GNUNET_OK;
602 }
603
604
605 /**
606  * We're finished with this client and especially its input
607  * processing.  If the RC is zero, free all resources otherwise wait
608  * until RC hits zero to do so.
609  */
610 static void
611 shutdown_incoming_processing (struct GNUNET_SERVER_Client *client)
612 {
613   struct GNUNET_SERVER_Client *prev;
614   struct GNUNET_SERVER_Client *pos;
615   struct GNUNET_SERVER_Handle *server;
616   struct NotifyList *n;
617   unsigned int rc;
618
619   rc = client->reference_count;
620   if (client->server != NULL)
621     {
622       server = client->server;
623       client->server = NULL;
624       prev = NULL;
625       pos = server->clients;
626       while ((pos != NULL) && (pos != client))
627         {
628           prev = pos;
629           pos = pos->next;
630         }
631       GNUNET_assert (pos != NULL);
632       if (prev == NULL)
633         server->clients = pos->next;
634       else
635         prev->next = pos->next;
636       n = server->disconnect_notify_list;
637       while (n != NULL)
638         {
639           n->callback (n->callback_cls, client);
640           n = n->next;
641         }
642     }
643   /* wait for RC to hit zero, then free */
644   if (rc > 0)
645     return;
646   client->destroy (client->client_closure);
647   GNUNET_free (client);
648 }
649
650
651 /**
652  * Go over the contents of the client buffer; as long as full messages
653  * are available, pass them on for processing.  Update the buffer
654  * accordingly.  Handles fatal errors by shutting down the connection.
655  *
656  * @param client identifies which client receive buffer to process
657  */
658 static void
659 process_client_buffer (struct GNUNET_SERVER_Client *client)
660 {
661   struct GNUNET_SERVER_Handle *server;
662   const struct GNUNET_MessageHeader *hdr;
663   size_t msize;
664
665   client->in_process_client_buffer = GNUNET_YES;
666   server = client->server;
667 #if DEBUG_SERVER
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "Private buffer contains %u bytes; client is %s and we are %s\n",
670               client->receive_pos,
671               client->suspended ? "suspended" : "up",
672               client->shutdown_now ? "in shutdown" : "running");
673 #endif
674   while ((client->receive_pos >= sizeof (struct GNUNET_MessageHeader)) &&
675          (0 == client->suspended) && (GNUNET_YES != client->shutdown_now))
676     {
677       hdr = (const struct GNUNET_MessageHeader *) &client->incoming_buffer;
678       msize = ntohs (hdr->size);
679       if (msize > client->receive_pos)
680         {
681 #if DEBUG_SERVER
682           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
683                       "Total message size is %u, we only have %u bytes; need more data\n",
684                       msize, client->receive_pos);
685 #endif
686           break;
687         }
688 #if DEBUG_SERVER
689       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
690                   "Passing %u bytes to callback for processing\n", msize);
691 #endif
692       if ((msize < sizeof (struct GNUNET_MessageHeader)) ||
693           (GNUNET_OK != GNUNET_SERVER_inject (server, client, hdr)))
694         {
695           client->in_process_client_buffer = GNUNET_NO;
696           shutdown_incoming_processing (client);
697           return;
698         }
699       /* FIXME: this is highly inefficient; we should
700          try to avoid this if the new base address is
701          already nicely aligned.  See old handler code... */
702       memmove (client->incoming_buffer,
703                &client->incoming_buffer[msize], client->receive_pos - msize);
704       client->receive_pos -= msize;
705     }
706   client->in_process_client_buffer = GNUNET_NO;
707   if (GNUNET_YES == client->shutdown_now)
708     shutdown_incoming_processing (client);
709 }
710
711
712 /**
713  * We are receiving an incoming message.  Process it.
714  *
715  * @param cls our closure (handle for the client)
716  * @param buf buffer with data received from network
717  * @param available number of bytes available in buf
718  * @param addr address of the sender
719  * @param addrlen length of addr
720  * @param errCode code indicating errors receiving, 0 for success
721  */
722 static void
723 process_incoming (void *cls,
724                   const void *buf,
725                   size_t available,
726                   const struct sockaddr *addr, socklen_t addrlen, int errCode)
727 {
728   struct GNUNET_SERVER_Client *client = cls;
729   struct GNUNET_SERVER_Handle *server = client->server;
730   const char *cbuf = buf;
731   size_t maxcpy;
732
733   if ((buf == NULL) ||
734       (available == 0) ||
735       (errCode != 0) ||
736       (server == NULL) ||
737       (client->shutdown_now == GNUNET_YES) ||
738       (GNUNET_YES != client->check (client->client_closure)))
739     {
740       /* other side closed connection, error connecting, etc. */
741       shutdown_incoming_processing (client);
742       return;
743     }
744 #if DEBUG_SERVER
745   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746               "Server receives %u bytes from `%s'.\n",
747               available, GNUNET_a2s (addr, addrlen));
748 #endif
749   GNUNET_SERVER_client_keep (client);
750   client->last_activity = GNUNET_TIME_absolute_get ();
751   /* process data (if available) */
752   while (available > 0)
753     {
754       maxcpy = available;
755       if (maxcpy > sizeof (client->incoming_buffer) - client->receive_pos)
756         maxcpy = sizeof (client->incoming_buffer) - client->receive_pos;
757 #if DEBUG_SERVER
758       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759                   "Can copy %u bytes to private buffer\n", maxcpy);
760 #endif
761       memcpy (&client->incoming_buffer[client->receive_pos], cbuf, maxcpy);
762       client->receive_pos += maxcpy;
763       cbuf += maxcpy;
764       available -= maxcpy;
765       if (0 < client->suspended)
766         {
767           if (available > 0)
768             {
769 #if DEBUG_SERVER
770               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771                           "Client has suspended processing; copying %u bytes to side buffer to be used later.\n",
772                           available);
773 #endif
774               GNUNET_assert (client->side_buf_size == 0);
775               GNUNET_assert (client->side_buf == NULL);
776               client->side_buf_size = available;
777               client->side_buf = GNUNET_malloc (available);
778               memcpy (client->side_buf, cbuf, available);
779               available = 0;
780             }
781           break;                /* do not run next client iteration! */
782         }
783 #if DEBUG_SERVER
784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785                   "Now processing messages in private buffer\n");
786 #endif
787       process_client_buffer (client);
788     }
789   GNUNET_assert (available == 0);
790   if ((client->suspended == 0) &&
791       (GNUNET_YES != client->shutdown_now) && (client->server != NULL))
792     {
793       /* Finally, keep receiving! */
794       client->receive (client->client_closure,
795                        GNUNET_SERVER_MAX_MESSAGE_SIZE,
796                        server->idle_timeout, &process_incoming, client);
797     }
798   if (GNUNET_YES == client->shutdown_now)
799     shutdown_incoming_processing (client);
800   GNUNET_SERVER_client_drop (client);
801 }
802
803
804 /**
805  * FIXME: document.
806  */
807 static void
808 restart_processing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
809 {
810   struct GNUNET_SERVER_Client *client = cls;
811
812   process_client_buffer (client);
813   if (0 == client->suspended)
814     client->receive (client->client_closure,
815                      GNUNET_SERVER_MAX_MESSAGE_SIZE,
816                      client->server->idle_timeout, &process_incoming, client);
817 }
818
819
820 /**
821  * Add a client to the set of our clients and
822  * start receiving.
823  */
824 static void
825 add_client (struct GNUNET_SERVER_Handle *server,
826             struct GNUNET_SERVER_Client *client)
827 {
828   client->server = server;
829   client->last_activity = GNUNET_TIME_absolute_get ();
830   client->next = server->clients;
831   server->clients = client;
832   client->receive (client->client_closure,
833                    GNUNET_SERVER_MAX_MESSAGE_SIZE,
834                    server->idle_timeout, &process_incoming, client);
835 }
836
837
838 /**
839  * Create a request for receiving data from a socket.
840  *
841  * @param cls identifies the socket to receive from
842  * @param max how much data to read at most
843  * @param timeout when should this operation time out
844  * @param receiver function to call for processing
845  * @param receiver_cls closure for receiver
846  */
847 static void
848 sock_receive (void *cls,
849               size_t max,
850               struct GNUNET_TIME_Relative timeout,
851               GNUNET_CONNECTION_Receiver receiver, void *receiver_cls)
852 {
853   GNUNET_CONNECTION_receive (cls, max, timeout, receiver, receiver_cls);
854 }
855
856
857 /**
858  * Wrapper to cancel receiving from a socket.
859  * 
860  * @param cls handle to the GNUNET_CONNECTION_Handle to cancel
861  */
862 static void
863 sock_receive_cancel (void *cls)
864 {
865   GNUNET_CONNECTION_receive_cancel (cls);
866 }
867
868
869 /**
870  * FIXME: document.
871  */
872 static void *
873 sock_notify_transmit_ready (void *cls,
874                             size_t size,
875                             struct GNUNET_TIME_Relative timeout,
876                             GNUNET_CONNECTION_TransmitReadyNotify notify,
877                             void *notify_cls)
878 {
879   return GNUNET_CONNECTION_notify_transmit_ready (cls, size, timeout, notify,
880                                                   notify_cls);
881 }
882
883
884 /**
885  * FIXME: document.
886  */
887 static void
888 sock_notify_transmit_ready_cancel (void *cls, void *h)
889 {
890   GNUNET_CONNECTION_notify_transmit_ready_cancel (h);
891 }
892
893
894 /**
895  * Check if socket is still valid (no fatal errors have happened so far).
896  *
897  * @param cls the socket
898  * @return GNUNET_YES if valid, GNUNET_NO otherwise
899  */
900 static int
901 sock_check (void *cls)
902 {
903   return GNUNET_CONNECTION_check (cls);
904 }
905
906
907 /**
908  * Destroy this socket (free resources).
909  *
910  * @param cls the socket
911  */
912 static void
913 sock_destroy (void *cls)
914 {
915   GNUNET_CONNECTION_destroy (cls);
916 }
917
918
919 /**
920  * Add a TCP socket-based connection to the set of handles managed by
921  * this server.  Use this function for outgoing (P2P) connections that
922  * we initiated (and where this server should process incoming
923  * messages).
924  *
925  * @param server the server to use
926  * @param connection the connection to manage (client must
927  *        stop using this connection from now on)
928  * @return the client handle (client should call
929  *         "client_drop" on the return value eventually)
930  */
931 struct GNUNET_SERVER_Client *
932 GNUNET_SERVER_connect_socket (struct
933                               GNUNET_SERVER_Handle
934                               *server,
935                               struct GNUNET_CONNECTION_Handle *connection)
936 {
937   struct GNUNET_SERVER_Client *client;
938
939   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
940   client->client_closure = connection;
941   client->receive = &sock_receive;
942   client->receive_cancel = &sock_receive_cancel;
943   client->notify_transmit_ready = &sock_notify_transmit_ready;
944   client->notify_transmit_ready_cancel = &sock_notify_transmit_ready_cancel;
945   client->check = &sock_check;
946   client->destroy = &sock_destroy;
947   client->reference_count = 1;
948   add_client (server, client);
949   return client;
950 }
951
952
953 /**
954  * Add an arbitrary connection to the set of handles managed by this
955  * server.  This can be used if a sending and receiving does not
956  * really go over the network (internal transmission) or for servers
957  * using UDP.
958  *
959  * @param server the server to use
960  * @param chandle opaque handle for the connection
961  * @param creceive receive function for the connection
962  * @param ccancel cancel receive function for the connection
963  * @param cnotify transmit notification function for the connection
964  * @param cnotify_cancel transmit notification cancellation function for the connection
965  * @param ccheck function to test if the connection is still up
966  * @param cdestroy function to close and free the connection
967  * @return the client handle (client should call
968  *         "client_drop" on the return value eventually)
969  */
970 struct GNUNET_SERVER_Client *
971 GNUNET_SERVER_connect_callback (struct
972                                 GNUNET_SERVER_Handle
973                                 *server,
974                                 void *chandle,
975                                 GNUNET_SERVER_ReceiveCallback
976                                 creceive,
977                                 GNUNET_SERVER_ReceiveCancelCallback
978                                 ccancel,
979                                 GNUNET_SERVER_TransmitReadyCallback
980                                 cnotify,
981                                 GNUNET_SERVER_TransmitReadyCancelCallback
982                                 cnotify_cancel,
983                                 GNUNET_SERVER_CheckCallback
984                                 ccheck,
985                                 GNUNET_SERVER_DestroyCallback cdestroy)
986 {
987   struct GNUNET_SERVER_Client *client;
988
989   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
990   client->client_closure = chandle;
991   client->receive = creceive;
992   client->receive_cancel = ccancel;
993   client->notify_transmit_ready = cnotify;
994   client->notify_transmit_ready_cancel = cnotify_cancel;
995   client->check = ccheck;
996   client->destroy = cdestroy;
997   client->reference_count = 1;
998   add_client (server, client);
999   return client;
1000 }
1001
1002
1003 /**
1004  * Notify the server that the given client handle should
1005  * be kept (keeps the connection up if possible, increments
1006  * the internal reference counter).
1007  *
1008  * @param client the client to keep
1009  */
1010 void
1011 GNUNET_SERVER_client_keep (struct GNUNET_SERVER_Client *client)
1012 {
1013   client->reference_count++;
1014 }
1015
1016
1017 /**
1018  * Notify the server that the given client handle is no
1019  * longer required.  Decrements the reference counter.  If
1020  * that counter reaches zero an inactive connection maybe
1021  * closed.
1022  *
1023  * @param client the client to drop
1024  */
1025 void
1026 GNUNET_SERVER_client_drop (struct GNUNET_SERVER_Client *client)
1027 {
1028   GNUNET_assert (client->reference_count > 0);
1029   client->reference_count--;
1030   if ((client->server == NULL) && (client->reference_count == 0))
1031     shutdown_incoming_processing (client);
1032 }
1033
1034
1035 /**
1036  * Obtain the network address of the other party.
1037  *
1038  * @param client the client to get the address for
1039  * @param addr where to store the address
1040  * @param addrlen where to store the length of the address
1041  * @return GNUNET_OK on success
1042  */
1043 int
1044 GNUNET_SERVER_client_get_address (struct GNUNET_SERVER_Client *client,
1045                                   void **addr, size_t * addrlen)
1046 {
1047   if (client->receive != &sock_receive)
1048     return GNUNET_SYSERR;       /* not a network client */
1049   return GNUNET_CONNECTION_get_address (client->client_closure,
1050                                         addr, addrlen);
1051 }
1052
1053
1054 /**
1055  * Ask the server to notify us whenever a client disconnects.
1056  * This function is called whenever the actual network connection
1057  * is closed; the reference count may be zero or larger than zero
1058  * at this point.
1059  *
1060  * @param server the server manageing the clients
1061  * @param callback function to call on disconnect
1062  * @param callback_cls closure for callback
1063  */
1064 void
1065 GNUNET_SERVER_disconnect_notify (struct GNUNET_SERVER_Handle *server,
1066                                  GNUNET_SERVER_DisconnectCallback callback,
1067                                  void *callback_cls)
1068 {
1069   struct NotifyList *n;
1070
1071   n = GNUNET_malloc (sizeof (struct NotifyList));
1072   n->callback = callback;
1073   n->callback_cls = callback_cls;
1074   n->next = server->disconnect_notify_list;
1075   server->disconnect_notify_list = n;
1076 }
1077
1078
1079 /**
1080  * Ask the server to disconnect from the given client.
1081  * This is the same as returning GNUNET_SYSERR from a message
1082  * handler, except that it allows dropping of a client even
1083  * when not handling a message from that client.
1084  *
1085  * @param client the client to disconnect from
1086  */
1087 void
1088 GNUNET_SERVER_client_disconnect (struct GNUNET_SERVER_Client *client)
1089 {
1090   if (client->server == NULL)
1091     return;                     /* already disconnected */
1092   client->receive_cancel (client->client_closure);
1093   shutdown_incoming_processing (client);
1094 }
1095
1096
1097 /**
1098  * Notify us when the server has enough space to transmit
1099  * a message of the given size to the given client.
1100  *
1101  * @param client client to transmit message to
1102  * @param size requested amount of buffer space
1103  * @param timeout after how long should we give up (and call
1104  *        notify with buf NULL and size 0)?
1105  * @param callback function to call when space is available
1106  * @param callback_cls closure for callback
1107  * @return non-NULL if the notify callback was queued; can be used
1108  *           to cancel the request using
1109  *           GNUNET_CONNECTION_notify_transmit_ready_cancel.
1110  *         NULL if we are already going to notify someone else (busy)
1111  */
1112 struct GNUNET_CONNECTION_TransmitHandle *
1113 GNUNET_SERVER_notify_transmit_ready (struct GNUNET_SERVER_Client *client,
1114                                      size_t size,
1115                                      struct GNUNET_TIME_Relative timeout,
1116                                      GNUNET_CONNECTION_TransmitReadyNotify
1117                                      callback, void *callback_cls)
1118 {
1119   return client->notify_transmit_ready (client->client_closure,
1120                                         size,
1121                                         timeout, callback, callback_cls);
1122 }
1123
1124
1125 /**
1126  * Resume receiving from this client, we are done processing the
1127  * current request.  This function must be called from within each
1128  * GNUNET_SERVER_MessageCallback (or its respective continuations).
1129  *
1130  * @param client client we were processing a message of
1131  * @param success GNUNET_OK to keep the connection open and
1132  *                          continue to receive
1133  *                GNUNET_SYSERR to close the connection (signal
1134  *                          serious error)
1135  */
1136 void
1137 GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int success)
1138 {
1139   char *sb;
1140
1141   if (client == NULL)
1142     return;
1143   GNUNET_assert (client->suspended > 0);
1144   client->suspended--;
1145   if (success != GNUNET_OK)
1146     client->shutdown_now = GNUNET_YES;
1147   if (client->suspended > 0)
1148     return;
1149   if (client->in_process_client_buffer == GNUNET_YES)
1150     return;
1151   if (client->side_buf_size > 0)
1152     {
1153       /* resume processing from side-buf */
1154       sb = client->side_buf;
1155       client->side_buf = NULL;
1156       /* this will also resume the receive job */
1157       if (GNUNET_YES != client->shutdown_now)
1158         process_incoming (client, sb, client->side_buf_size, NULL, 0, 0);
1159       else
1160         shutdown_incoming_processing (client);
1161       /* finally, free the side-buf */
1162       GNUNET_free (sb);
1163       return;
1164     }
1165   /* resume receive job */
1166   if (GNUNET_YES != client->shutdown_now)
1167     {
1168       GNUNET_SCHEDULER_add_continuation (client->server->sched,
1169                                          &restart_processing,
1170                                          client,
1171                                          GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1172       return;
1173     }
1174   shutdown_incoming_processing (client);
1175 }
1176
1177
1178 /* end of server.c */