bugfix
[oweals/gnunet.git] / src / util / server.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file util/server.c
23  * @brief library for building GNUnet network servers
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix inefficient memmove in message processing
28  */
29
30 #include "platform.h"
31 #include "gnunet_common.h"
32 #include "gnunet_network_lib.h"
33 #include "gnunet_scheduler_lib.h"
34 #include "gnunet_server_lib.h"
35 #include "gnunet_time_lib.h"
36
37 #define DEBUG_SERVER GNUNET_NO
38
39 /**
40  * List of arrays of message handlers.
41  */
42 struct HandlerList
43 {
44   /**
45    * This is a linked list.
46    */
47   struct HandlerList *next;
48
49   /**
50    * NULL-terminated array of handlers.
51    */
52   const struct GNUNET_SERVER_MessageHandler *handlers;
53 };
54
55
56 /**
57  * List of arrays of message handlers.
58  */
59 struct NotifyList
60 {
61   /**
62    * This is a linked list.
63    */
64   struct NotifyList *next;
65
66   /**
67    * Function to call.
68    */
69   GNUNET_SERVER_DisconnectCallback callback;
70
71   /**
72    * Closure for callback.
73    */
74   void *callback_cls;
75 };
76
77
78 /**
79  * @brief handle for a server
80  */
81 struct GNUNET_SERVER_Handle
82 {
83   /**
84    * My scheduler.
85    */
86   struct GNUNET_SCHEDULER_Handle *sched;
87
88   /**
89    * List of handlers for incoming messages.
90    */
91   struct HandlerList *handlers;
92
93   /**
94    * List of our current clients.
95    */
96   struct GNUNET_SERVER_Client *clients;
97
98   /**
99    * Linked list of functions to call on disconnects by clients.
100    */
101   struct NotifyList *disconnect_notify_list;
102
103   /**
104    * Function to call for access control.
105    */
106   GNUNET_NETWORK_AccessCheck access;
107
108   /**
109    * Closure for access.
110    */
111   void *access_cls;
112
113   /**
114    * After how long should an idle connection time
115    * out (on write).
116    */
117   struct GNUNET_TIME_Relative idle_timeout;
118
119   /**
120    * maximum write buffer size for accepted sockets
121    */
122   size_t maxbuf;
123
124   /**
125    * Pipe used to signal shutdown of the server.
126    */
127   int shutpipe[2];
128
129   /**
130    * Socket used to listen for new connections.  Set to
131    * "-1" by GNUNET_SERVER_destroy to initiate shutdown.
132    */
133   int listen_socket;
134
135   /**
136    * Set to GNUNET_YES if we are shutting down.
137    */
138   int do_shutdown;
139
140   /**
141    * Do we ignore messages of types that we do not
142    * understand or do we require that a handler
143    * is found (and if not kill the connection)?
144    */
145   int require_found;
146
147 };
148
149
150 /**
151  * @brief handle for a client of the server
152  */
153 struct GNUNET_SERVER_Client
154 {
155
156   /**
157    * Size of the buffer for incoming data.  Should be
158    * first so we get nice alignment.
159    */
160   char incoming_buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE];
161
162   /**
163    * This is a linked list.
164    */
165   struct GNUNET_SERVER_Client *next;
166
167   /**
168    * Server that this client belongs to.
169    */
170   struct GNUNET_SERVER_Handle *server;
171
172   /**
173    * Client closure for callbacks.
174    */
175   void *client_closure;
176
177   /**
178    * Callback to receive from client.
179    */
180   GNUNET_SERVER_ReceiveCallback receive;
181
182   /**
183    * Callback to cancel receive from client.
184    */
185   GNUNET_SERVER_ReceiveCancelCallback receive_cancel;
186
187   /**
188    * Callback to ask about transmit-ready notification.
189    */
190   GNUNET_SERVER_TransmitReadyCallback notify_transmit_ready;
191
192    /**
193    * Callback to ask about transmit-ready notification.
194    */
195   GNUNET_SERVER_TransmitReadyCancelCallback notify_transmit_ready_cancel;
196
197   /**
198    * Callback to check if client is still valid.
199    */
200   GNUNET_SERVER_CheckCallback check;
201
202   /**
203    * Callback to destroy client.
204    */
205   GNUNET_SERVER_DestroyCallback destroy;
206
207   /**
208    * Side-buffer for incoming data used when processing
209    * is suspended.
210    */
211   char *side_buf;
212
213   /**
214    * Number of bytes in the side buffer.
215    */
216   size_t side_buf_size;
217
218   /**
219    * Last activity on this socket (used to time it out
220    * if reference_count == 0).
221    */
222   struct GNUNET_TIME_Absolute last_activity;
223
224   /**
225    * Current task identifier for the receive call
226    * (or GNUNET_SCHEDULER_NO_PREREQUISITE_TASK for none).
227    */
228   GNUNET_SCHEDULER_TaskIdentifier my_receive;
229
230   /**
231    * How many bytes in the "incoming_buffer" are currently
232    * valid? (starting at offset 0).
233    */
234   size_t receive_pos;
235
236   /**
237    * Number of external entities with a reference to
238    * this client object.
239    */
240   unsigned int reference_count;
241
242   /**
243    * Was processing if incoming messages suspended while
244    * we were still processing data already received?
245    * This is a counter saying how often processing was
246    * suspended (once per handler invoked).
247    */
248   unsigned int suspended;
249
250   /**
251    * Are we currently in the "process_client_buffer" function (and
252    * will hence restart the receive job on exit if suspended == 0 once
253    * we are done?).  If this is set, then "receive_done" will
254    * essentially only decrement suspended; if this is not set, then
255    * "receive_done" may need to restart the receive process (either
256    * from the side-buffer or via select/recv).
257    */
258   int in_process_client_buffer;
259
260   /**
261    * We're about to close down this client due to some serious
262    * error.
263    */
264   int shutdown_now;
265
266 };
267
268
269 /**
270  * Server has been asked to shutdown, free resources.
271  */
272 static void
273 destroy_server (struct GNUNET_SERVER_Handle *server)
274 {
275   struct GNUNET_SERVER_Client *pos;
276   struct HandlerList *hpos;
277   struct NotifyList *npos;
278
279 #if DEBUG_SERVER
280   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281               "Server shutting down.\n");
282 #endif
283   GNUNET_assert (server->listen_socket == -1);
284   GNUNET_break (0 == CLOSE (server->shutpipe[0]));
285   GNUNET_break (0 == CLOSE (server->shutpipe[1]));
286   while (server->clients != NULL)
287     {
288       pos = server->clients;
289       server->clients = pos->next;
290       pos->server = NULL;
291     }
292   while (NULL != (hpos = server->handlers))
293     {
294       server->handlers = hpos->next;
295       GNUNET_free (hpos);
296     }
297   while (NULL != (npos = server->disconnect_notify_list))
298     {
299       server->disconnect_notify_list = npos->next;
300       GNUNET_free (npos);
301     }
302   GNUNET_free (server);
303 }
304
305
306 /**
307  * Scheduler says our listen socket is ready.
308  * Process it!
309  */
310 static void
311 process_listen_socket (void *cls,
312                        const struct GNUNET_SCHEDULER_TaskContext *tc)
313 {
314   struct GNUNET_SERVER_Handle *server = cls;
315   struct GNUNET_NETWORK_SocketHandle *sock;
316   struct GNUNET_SERVER_Client *client;
317   fd_set r;
318
319   if ((server->do_shutdown) ||
320       ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0))
321     {
322       /* shutdown was initiated */
323       GNUNET_assert (server->listen_socket != -1);
324       GNUNET_break (0 == CLOSE (server->listen_socket));
325       server->listen_socket = -1;
326       if (server->do_shutdown)
327         destroy_server (server);
328       return;
329     }
330   GNUNET_assert (FD_ISSET (server->listen_socket, tc->read_ready));
331   GNUNET_assert (!FD_ISSET (server->shutpipe[0], tc->read_ready));
332   sock = GNUNET_NETWORK_socket_create_from_accept (tc->sched,
333                                                    server->access,
334                                                    server->access_cls,
335                                                    server->listen_socket,
336                                                    server->maxbuf);
337   if (sock != NULL)
338     {
339 #if DEBUG_SERVER
340       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341                   "Server accepted incoming connection.\n");
342 #endif
343       client = GNUNET_SERVER_connect_socket (server, sock);
344       /* decrement reference count, we don't keep "client" alive */
345       GNUNET_SERVER_client_drop (client);
346     }
347   /* listen for more! */
348   FD_ZERO (&r);
349   FD_SET (server->listen_socket, &r);
350   FD_SET (server->shutpipe[0], &r);
351   GNUNET_SCHEDULER_add_select (server->sched,
352                                GNUNET_YES,
353                                GNUNET_SCHEDULER_PRIORITY_HIGH,
354                                GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
355                                GNUNET_TIME_UNIT_FOREVER_REL,
356                                GNUNET_MAX (server->listen_socket,
357                                            server->shutpipe[0]) + 1, &r, NULL,
358                                &process_listen_socket, server);
359 }
360
361
362 /**
363  * Create and initialize a listen socket for the server.
364  *
365  * @return -1 on error, otherwise the listen socket
366  */
367 static int
368 open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
369 {
370   const static int on = 1;
371   int fd;
372   uint16_t port;
373
374   switch (serverAddr->sa_family)
375     {
376     case AF_INET:
377       port = ntohs (((const struct sockaddr_in *) serverAddr)->sin_port);
378       break;
379     case AF_INET6:
380       port = ntohs (((const struct sockaddr_in6 *) serverAddr)->sin6_port);
381       break;
382     default:
383       GNUNET_break (0);
384       return -1;
385     }
386   fd = SOCKET (serverAddr->sa_family, SOCK_STREAM, 0);
387   if (fd < 0)
388     {
389       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
390       return -1;
391     }
392 #ifndef MINGW
393   // FIXME NILS
394   if (0 != fcntl (fd, F_SETFD, fcntl (fd, F_GETFD) | FD_CLOEXEC))
395     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
396                          "fcntl");
397 #endif
398   if (SETSOCKOPT (fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) < 0)
399     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
400                          "setsockopt");
401   /* bind the socket */
402   if (BIND (fd, serverAddr, socklen) < 0)
403     {
404       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind");
405       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
406                   _
407                   ("`%s' failed for port %d. Is the service already running?\n"),
408                   "bind", port);
409       GNUNET_break (0 == CLOSE (fd));
410       return -1;
411     }
412   if (0 != LISTEN (fd, 5))
413     {
414       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "listen");
415       GNUNET_break (0 == CLOSE (fd));
416       return -1;
417     }
418 #if DEBUG_SERVER
419       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
420                   "Server starts to listen on port %u.\n",
421                   port);
422 #endif
423   return fd;
424 }
425
426
427 /**
428  * Create a new server.
429  *
430  * @param sched scheduler to use
431  * @param access function for access control
432  * @param access_cls closure for access
433  * @param serverAddr address to listen on (including port), use NULL
434  *        for internal server (no listening)
435  * @param socklen length of serverAddr
436  * @param maxbuf maximum write buffer size for accepted sockets
437  * @param idle_timeout after how long should we timeout idle connections?
438  * @param require_found if YES, connections sending messages of unknown type
439  *        will be closed
440  * @return handle for the new server, NULL on error
441  *         (typically, "port" already in use)
442  */
443 struct GNUNET_SERVER_Handle *
444 GNUNET_SERVER_create (struct GNUNET_SCHEDULER_Handle *sched,
445                       GNUNET_NETWORK_AccessCheck access,
446                       void *access_cls,
447                       const struct sockaddr *serverAddr,
448                       socklen_t socklen,
449                       size_t maxbuf,
450                       struct GNUNET_TIME_Relative
451                       idle_timeout, int require_found)
452 {
453   struct GNUNET_SERVER_Handle *ret;
454   int lsock;
455   fd_set r;
456
457   lsock = -2;
458   if (serverAddr != NULL)
459     {
460       lsock = open_listen_socket (serverAddr, socklen);
461       if (lsock == -1)
462         return NULL;
463     }
464   ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Handle));
465   if (0 != PIPE (ret->shutpipe))
466     {
467       GNUNET_break (0 == CLOSE (lsock));
468       GNUNET_free (ret);
469       return NULL;
470     }
471   ret->sched = sched;
472   ret->maxbuf = maxbuf;
473   ret->idle_timeout = idle_timeout;
474   ret->listen_socket = lsock;
475   ret->access = access;
476   ret->access_cls = access_cls;
477   ret->require_found = require_found;
478   if (lsock >= 0)
479     {
480       FD_ZERO (&r);
481       FD_SET (ret->listen_socket, &r);
482       FD_SET (ret->shutpipe[0], &r);
483       GNUNET_SCHEDULER_add_select (sched,
484                                    GNUNET_YES,
485                                    GNUNET_SCHEDULER_PRIORITY_HIGH,
486                                    GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
487                                    GNUNET_TIME_UNIT_FOREVER_REL,
488                                    GNUNET_MAX (ret->listen_socket,
489                                                ret->shutpipe[0]) + 1, &r,
490                                    NULL, &process_listen_socket, ret);
491     }
492   return ret;
493 }
494
495
496 /**
497  * Free resources held by this server.
498  */
499 void
500 GNUNET_SERVER_destroy (struct GNUNET_SERVER_Handle *s)
501 {
502   static char c;
503
504   GNUNET_assert (s->do_shutdown == GNUNET_NO);
505   s->do_shutdown = GNUNET_YES;
506   if (s->listen_socket == -1)
507     destroy_server (s);
508   else
509     GNUNET_break (1 == WRITE (s->shutpipe[1], &c, 1));
510 }
511
512
513 /**
514  * Add additional handlers to an existing server.
515  *
516  * @param server the server to add handlers to
517  * @param handlers array of message handlers for
518  *        incoming messages; the last entry must
519  *        have "NULL" for the "callback"; multiple
520  *        entries for the same type are allowed,
521  *        they will be called in order of occurence.
522  *        These handlers can be removed later;
523  *        the handlers array must exist until removed
524  *        (or server is destroyed).
525  */
526 void
527 GNUNET_SERVER_add_handlers (struct GNUNET_SERVER_Handle *server,
528                             const struct GNUNET_SERVER_MessageHandler
529                             *handlers)
530 {
531   struct HandlerList *p;
532
533   p = GNUNET_malloc (sizeof (struct HandlerList));
534   p->handlers = handlers;
535   p->next = server->handlers;
536   server->handlers = p;
537 }
538
539
540 /**
541  * Inject a message into the server, pretend it came
542  * from the specified client.  Delivery of the message
543  * will happen instantly (if a handler is installed;
544  * otherwise the call does nothing).
545  *
546  * @param server the server receiving the message
547  * @param sender the "pretended" sender of the message
548  *        can be NULL!
549  * @param message message to transmit
550  * @return GNUNET_OK if the message was OK and the
551  *                   connection can stay open
552  *         GNUNET_SYSERR if the connection to the
553  *         client should be shut down
554  */
555 int
556 GNUNET_SERVER_inject (struct GNUNET_SERVER_Handle *server,
557                       struct GNUNET_SERVER_Client *sender,
558                       const struct GNUNET_MessageHeader *message)
559 {
560   struct HandlerList *pos;
561   const struct GNUNET_SERVER_MessageHandler *mh;
562   unsigned int i;
563   uint16_t type;
564   uint16_t size;
565   int found;
566
567   type = ntohs (message->type);
568   size = ntohs (message->size);
569 #if DEBUG_SERVER
570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571               "Server schedules transmission of %u-byte message of type %u to client.\n",
572               size,
573               type);
574 #endif
575   pos = server->handlers;
576   found = GNUNET_NO;
577   while (pos != NULL)
578     {
579       i = 0;
580       while (pos->handlers[i].callback != NULL)
581         {
582           mh = &pos->handlers[i];
583           if (mh->type == type)
584             {
585               if ((mh->expected_size != 0) && (mh->expected_size != size))
586                 {
587                   GNUNET_break_op (0);
588                   return GNUNET_SYSERR;
589                 }
590               if (sender != NULL)
591                 sender->suspended++;
592               mh->callback (mh->callback_cls, sender, message);
593               found = GNUNET_YES;
594             }
595           i++;
596         }
597       pos = pos->next;
598     }
599   if (found == GNUNET_NO)
600     {
601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
602                   _("Received message of unknown type %d\n"), type);
603       if (server->require_found == GNUNET_YES)
604         return GNUNET_SYSERR;
605     }
606   return GNUNET_OK;
607 }
608
609
610 /**
611  * We're finished with this client and especially its input
612  * processing.  If the RC is zero, free all resources otherwise wait
613  * until RC hits zero to do so.
614  */
615 static void
616 shutdown_incoming_processing (struct GNUNET_SERVER_Client *client)
617 {
618   struct GNUNET_SERVER_Client *prev;
619   struct GNUNET_SERVER_Client *pos;
620   struct GNUNET_SERVER_Handle *server;
621   struct NotifyList *n;
622   unsigned int rc;
623
624   GNUNET_assert (client->my_receive == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
625   rc = client->reference_count;
626   if (client->server != NULL)
627     {
628       server = client->server;
629       client->server = NULL;
630       prev = NULL;
631       pos = server->clients;
632       while ((pos != NULL) && (pos != client))
633         {
634           prev = pos;
635           pos = pos->next;
636         }
637       GNUNET_assert (pos != NULL);
638       if (prev == NULL)
639         server->clients = pos->next;
640       else
641         prev->next = pos->next;
642       n = server->disconnect_notify_list;
643       while (n != NULL)
644         {
645           n->callback (n->callback_cls, client);
646           n = n->next;
647         }
648     }
649   /* wait for RC to hit zero, then free */
650   if (rc > 0)
651     return;
652   client->destroy (client->client_closure);
653   GNUNET_free (client);
654 }
655
656
657 /**
658  * Go over the contents of the client buffer; as long as full messages
659  * are available, pass them on for processing.  Update the buffer
660  * accordingly.  Handles fatal errors by shutting down the connection.
661  *
662  * @param client identifies which client receive buffer to process
663  */
664 static void
665 process_client_buffer (struct GNUNET_SERVER_Client *client)
666 {
667   struct GNUNET_SERVER_Handle *server;
668   const struct GNUNET_MessageHeader *hdr;
669   size_t msize;
670
671   client->in_process_client_buffer = GNUNET_YES;
672   server = client->server;
673 #if DEBUG_SERVER
674   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
675               "Private buffer contains %u bytes; client is %s and we are %s\n",
676               client->receive_pos,
677               client->suspended ? "suspended" : "up",
678               client->shutdown_now ? "in shutdown" : "running");
679 #endif
680   while ((client->receive_pos >= sizeof (struct GNUNET_MessageHeader)) &&
681          (0 == client->suspended) && (GNUNET_YES != client->shutdown_now))
682     {
683       hdr = (const struct GNUNET_MessageHeader *) &client->incoming_buffer;
684       msize = ntohs (hdr->size);
685       if (msize > client->receive_pos)
686         {
687 #if DEBUG_SERVER
688           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689                       "Total message size is %u, we only have %u bytes; need more data\n",
690                       msize,
691                       client->receive_pos);
692 #endif
693           break;
694         }
695 #if DEBUG_SERVER
696       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697                   "Passing %u bytes to callback for processing\n",
698                   msize);
699 #endif
700       if ((msize < sizeof (struct GNUNET_MessageHeader)) ||
701           (GNUNET_OK != GNUNET_SERVER_inject (server, client, hdr)))
702         {
703           client->in_process_client_buffer = GNUNET_NO;
704           shutdown_incoming_processing (client);
705           return;
706         }
707       /* FIXME: this is highly inefficient; we should
708          try to avoid this if the new base address is
709          already nicely aligned.  See old handler code... */
710       memmove (client->incoming_buffer,
711                &client->incoming_buffer[msize], client->receive_pos - msize);
712       client->receive_pos -= msize;
713     }
714   client->in_process_client_buffer = GNUNET_NO;
715   if (GNUNET_YES == client->shutdown_now)
716     shutdown_incoming_processing (client);
717 }
718
719
720 /**
721  * We are receiving an incoming message.  Process it.
722  *
723  * @param cls our closure (handle for the client)
724  * @param buf buffer with data received from network
725  * @param available number of bytes available in buf
726  * @param addr address of the sender
727  * @param addrlen length of addr
728  * @param errCode code indicating errors receiving, 0 for success
729  */
730 static void
731 process_incoming (void *cls,
732                   const void *buf,
733                   size_t available,
734                   const struct sockaddr *addr, 
735                   socklen_t addrlen,
736                   int errCode)
737 {
738   struct GNUNET_SERVER_Client *client = cls;
739   struct GNUNET_SERVER_Handle *server = client->server;
740   const char *cbuf = buf;
741   size_t maxcpy;
742
743   client->my_receive = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
744   if ((buf == NULL) ||
745       (available == 0) ||
746       (errCode != 0) ||
747       (server == NULL) ||
748       (client->shutdown_now == GNUNET_YES) ||
749       (GNUNET_YES != client->check (client->client_closure)))
750     {
751       /* other side closed connection, error connecting, etc. */
752       shutdown_incoming_processing (client);
753       return;
754     }
755 #if DEBUG_SERVER
756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
757               "Server receives %u bytes from `%s'.\n",
758               available,
759               GNUNET_a2s(addr, addrlen));
760 #endif
761   GNUNET_SERVER_client_keep (client);
762   client->last_activity = GNUNET_TIME_absolute_get ();
763   /* process data (if available) */
764   while (available > 0)
765     {
766       maxcpy = available;
767       if (maxcpy > sizeof (client->incoming_buffer) - client->receive_pos)
768         maxcpy = sizeof (client->incoming_buffer) - client->receive_pos;
769 #if DEBUG_SERVER
770       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771                   "Can copy %u bytes to private buffer\n",
772                   maxcpy);
773 #endif
774       memcpy (&client->incoming_buffer[client->receive_pos], cbuf, maxcpy);
775       client->receive_pos += maxcpy;
776       cbuf += maxcpy;
777       available -= maxcpy;
778       if (0 < client->suspended)
779         {
780           if (available > 0)
781             {
782 #if DEBUG_SERVER
783               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784                           "Client has suspended processing; copying %u bytes to side buffer to be used later.\n",
785                           available);
786 #endif
787               GNUNET_assert (client->side_buf_size == 0);
788               GNUNET_assert (client->side_buf == NULL);
789               client->side_buf_size = available;
790               client->side_buf = GNUNET_malloc (available);
791               memcpy (client->side_buf, cbuf, available);
792               available = 0;
793             }
794           break;                /* do not run next client iteration! */
795         }
796 #if DEBUG_SERVER
797       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
798                   "Now processing messages in private buffer\n");
799 #endif
800       process_client_buffer (client);
801     }
802   GNUNET_assert (available == 0);
803   if ((client->suspended == 0) &&
804       (GNUNET_YES != client->shutdown_now) && (client->server != NULL))
805     {
806       /* Finally, keep receiving! */
807       client->my_receive = client->receive (client->client_closure,
808                                             GNUNET_SERVER_MAX_MESSAGE_SIZE,
809                                             server->idle_timeout,
810                                             &process_incoming, client);
811     }
812   if (GNUNET_YES == client->shutdown_now)
813     shutdown_incoming_processing (client);
814   GNUNET_SERVER_client_drop (client);
815 }
816
817
818 /**
819  * FIXME: document.
820  */
821 static void
822 restart_processing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
823 {
824   struct GNUNET_SERVER_Client *client = cls;
825
826   process_client_buffer (client);
827   if (0 == client->suspended)
828     client->my_receive = client->receive (client->client_closure,
829                                           GNUNET_SERVER_MAX_MESSAGE_SIZE,
830                                           client->server->idle_timeout,
831                                           &process_incoming, client);
832 }
833
834
835 /**
836  * Add a client to the set of our clients and
837  * start receiving.
838  */
839 static void
840 add_client (struct GNUNET_SERVER_Handle *server,
841             struct GNUNET_SERVER_Client *client)
842 {
843   client->server = server;
844   client->last_activity = GNUNET_TIME_absolute_get ();
845   client->next = server->clients;
846   server->clients = client;
847   client->my_receive = client->receive (client->client_closure,
848                                         GNUNET_SERVER_MAX_MESSAGE_SIZE,
849                                         server->idle_timeout,
850                                         &process_incoming, client);
851 }
852
853
854 /**
855  * Create a request for receiving data from a socket.
856  *
857  * @param cls identifies the socket to receive from
858  * @param max how much data to read at most
859  * @param timeout when should this operation time out
860  * @param receiver function to call for processing
861  * @param receiver_cls closure for receiver
862  * @return task identifier that can be used to cancel the operation
863  */
864 static GNUNET_SCHEDULER_TaskIdentifier
865 sock_receive (void *cls,
866               size_t max,
867               struct GNUNET_TIME_Relative timeout,
868               GNUNET_NETWORK_Receiver receiver, void *receiver_cls)
869 {
870   return GNUNET_NETWORK_receive (cls, max, timeout, receiver, receiver_cls);
871 }
872
873
874 /**
875  * Wrapper to cancel receiving from a socket.
876  * 
877  * @param cls handle to the GNUNET_NETWORK_SocketHandle to cancel
878  * @param tc task ID that was returned by GNUNET_NETWORK_receive
879  */
880 static void
881 sock_receive_cancel (void *cls, GNUNET_SCHEDULER_TaskIdentifier ti)
882 {
883   GNUNET_NETWORK_receive_cancel (cls, ti);
884 }
885
886
887 /**
888  * FIXME: document.
889  */
890 static void *
891 sock_notify_transmit_ready (void *cls,
892                             size_t size,
893                             struct GNUNET_TIME_Relative timeout,
894                             GNUNET_NETWORK_TransmitReadyNotify notify,
895                             void *notify_cls)
896 {
897   return GNUNET_NETWORK_notify_transmit_ready (cls, size, timeout, notify,
898                                                notify_cls);
899 }
900
901
902 /**
903  * FIXME: document.
904  */
905 static void
906 sock_notify_transmit_ready_cancel (void *cls, void *h)
907 {
908   GNUNET_NETWORK_notify_transmit_ready_cancel (h);
909 }
910
911
912 /**
913  * Check if socket is still valid (no fatal errors have happened so far).
914  *
915  * @param cls the socket
916  * @return GNUNET_YES if valid, GNUNET_NO otherwise
917  */
918 static int
919 sock_check (void *cls)
920 {
921   return GNUNET_NETWORK_socket_check (cls);
922 }
923
924
925 /**
926  * Destroy this socket (free resources).
927  *
928  * @param cls the socket
929  */
930 static void
931 sock_destroy (void *cls)
932 {
933   GNUNET_NETWORK_socket_destroy (cls);
934 }
935
936
937 /**
938  * Add a TCP socket-based connection to the set of handles managed by
939  * this server.  Use this function for outgoing (P2P) connections that
940  * we initiated (and where this server should process incoming
941  * messages).
942  *
943  * @param server the server to use
944  * @param connection the connection to manage (client must
945  *        stop using this connection from now on)
946  * @return the client handle (client should call
947  *         "client_drop" on the return value eventually)
948  */
949 struct GNUNET_SERVER_Client *
950 GNUNET_SERVER_connect_socket (struct
951                               GNUNET_SERVER_Handle
952                               *server,
953                               struct GNUNET_NETWORK_SocketHandle *connection)
954 {
955   struct GNUNET_SERVER_Client *client;
956
957   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
958   client->client_closure = connection;
959   client->receive = &sock_receive;
960   client->receive_cancel = &sock_receive_cancel;
961   client->notify_transmit_ready = &sock_notify_transmit_ready;
962   client->notify_transmit_ready_cancel = &sock_notify_transmit_ready_cancel;
963   client->check = &sock_check;
964   client->destroy = &sock_destroy;
965   client->reference_count = 1;
966   add_client (server, client);
967   return client;
968 }
969
970
971 /**
972  * Add an arbitrary connection to the set of handles managed by this
973  * server.  This can be used if a sending and receiving does not
974  * really go over the network (internal transmission) or for servers
975  * using UDP.
976  *
977  * @param server the server to use
978  * @param chandle opaque handle for the connection
979  * @param creceive receive function for the connection
980  * @param ccancel cancel receive function for the connection
981  * @param cnotify transmit notification function for the connection
982  * @param cnotify_cancel transmit notification cancellation function for the connection
983  * @param ccheck function to test if the connection is still up
984  * @param cdestroy function to close and free the connection
985  * @return the client handle (client should call
986  *         "client_drop" on the return value eventually)
987  */
988 struct GNUNET_SERVER_Client *
989 GNUNET_SERVER_connect_callback (struct
990                                 GNUNET_SERVER_Handle
991                                 *server,
992                                 void *chandle,
993                                 GNUNET_SERVER_ReceiveCallback
994                                 creceive,
995                                 GNUNET_SERVER_ReceiveCancelCallback
996                                 ccancel,
997                                 GNUNET_SERVER_TransmitReadyCallback
998                                 cnotify,
999                                 GNUNET_SERVER_TransmitReadyCancelCallback
1000                                 cnotify_cancel,
1001                                 GNUNET_SERVER_CheckCallback
1002                                 ccheck,
1003                                 GNUNET_SERVER_DestroyCallback cdestroy)
1004 {
1005   struct GNUNET_SERVER_Client *client;
1006
1007   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
1008   client->client_closure = chandle;
1009   client->receive = creceive;
1010   client->receive_cancel = ccancel;
1011   client->notify_transmit_ready = cnotify;
1012   client->notify_transmit_ready_cancel = cnotify_cancel;
1013   client->check = ccheck;
1014   client->destroy = cdestroy;
1015   client->reference_count = 1;
1016   add_client (server, client);
1017   return client;
1018 }
1019
1020
1021 /**
1022  * Notify the server that the given client handle should
1023  * be kept (keeps the connection up if possible, increments
1024  * the internal reference counter).
1025  *
1026  * @param client the client to keep
1027  */
1028 void
1029 GNUNET_SERVER_client_keep (struct GNUNET_SERVER_Client *client)
1030 {
1031   client->reference_count++;
1032 }
1033
1034
1035 /**
1036  * Notify the server that the given client handle is no
1037  * longer required.  Decrements the reference counter.  If
1038  * that counter reaches zero an inactive connection maybe
1039  * closed.
1040  *
1041  * @param client the client to drop
1042  */
1043 void
1044 GNUNET_SERVER_client_drop (struct GNUNET_SERVER_Client *client)
1045 {
1046   GNUNET_assert (client->reference_count > 0);
1047   client->reference_count--;
1048   if ((client->server == NULL) && (client->reference_count == 0))
1049     shutdown_incoming_processing (client);
1050 }
1051
1052
1053 /**
1054  * Obtain the network address of the other party.
1055  *
1056  * @param client the client to get the address for
1057  * @param addr where to store the address
1058  * @param addrlen where to store the length of the address
1059  * @return GNUNET_OK on success
1060  */
1061 int
1062 GNUNET_SERVER_client_get_address (struct GNUNET_SERVER_Client *client,
1063                                   void **addr, size_t * addrlen)
1064 {
1065   if (client->receive != &sock_receive)
1066     return GNUNET_SYSERR;       /* not a network client */
1067   return GNUNET_NETWORK_socket_get_address (client->client_closure,
1068                                             addr, addrlen);
1069 }
1070
1071
1072 /**
1073  * Ask the server to notify us whenever a client disconnects.
1074  * This function is called whenever the actual network connection
1075  * is closed; the reference count may be zero or larger than zero
1076  * at this point.
1077  *
1078  * @param server the server manageing the clients
1079  * @param callback function to call on disconnect
1080  * @param callback_cls closure for callback
1081  */
1082 void
1083 GNUNET_SERVER_disconnect_notify (struct GNUNET_SERVER_Handle *server,
1084                                  GNUNET_SERVER_DisconnectCallback callback,
1085                                  void *callback_cls)
1086 {
1087   struct NotifyList *n;
1088
1089   n = GNUNET_malloc (sizeof (struct NotifyList));
1090   n->callback = callback;
1091   n->callback_cls = callback_cls;
1092   n->next = server->disconnect_notify_list;
1093   server->disconnect_notify_list = n;
1094 }
1095
1096
1097 /**
1098  * Ask the server to disconnect from the given client.
1099  * This is the same as returning GNUNET_SYSERR from a message
1100  * handler, except that it allows dropping of a client even
1101  * when not handling a message from that client.
1102  *
1103  * @param client the client to disconnect from
1104  */
1105 void
1106 GNUNET_SERVER_client_disconnect (struct GNUNET_SERVER_Client *client)
1107 {
1108   if (client->server == NULL)
1109     return;                     /* already disconnected */
1110   GNUNET_assert (client->my_receive != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1111   client->receive_cancel (client->client_closure, client->my_receive);
1112   client->my_receive = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1113   shutdown_incoming_processing (client);
1114 }
1115
1116
1117 /**
1118  * Notify us when the server has enough space to transmit
1119  * a message of the given size to the given client.
1120  *
1121  * @param server the server to use
1122  * @param client client to transmit message to
1123  * @param size requested amount of buffer space
1124  * @param timeout after how long should we give up (and call
1125  *        notify with buf NULL and size 0)?
1126  * @param callback function to call when space is available
1127  * @param callback_cls closure for callback
1128  * @return non-NULL if the notify callback was queued; can be used
1129  *           to cancel the request using
1130  *           GNUNET_NETWORK_notify_transmit_ready_cancel.
1131  *         NULL if we are already going to notify someone else (busy)
1132  */
1133 struct GNUNET_NETWORK_TransmitHandle *
1134 GNUNET_SERVER_notify_transmit_ready (struct GNUNET_SERVER_Client *client,
1135                                      size_t size,
1136                                      struct GNUNET_TIME_Relative timeout,
1137                                      GNUNET_NETWORK_TransmitReadyNotify
1138                                      callback, void *callback_cls)
1139 {
1140   return client->notify_transmit_ready (client->client_closure,
1141                                         size,
1142                                         timeout, callback, callback_cls);
1143 }
1144
1145
1146 /**
1147  * Resume receiving from this client, we are done processing the
1148  * current request.  This function must be called from within each
1149  * GNUNET_SERVER_MessageCallback (or its respective continuations).
1150  *
1151  * @param client client we were processing a message of
1152  * @param success GNUNET_OK to keep the connection open and
1153  *                          continue to receive
1154  *                GNUNET_SYSERR to close the connection (signal
1155  *                          serious error)
1156  */
1157 void
1158 GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int success)
1159 {
1160   char *sb;
1161
1162   if (client == NULL)
1163     return;
1164   GNUNET_assert (client->suspended > 0);
1165   client->suspended--;
1166   if (success != GNUNET_OK)
1167     client->shutdown_now = GNUNET_YES;
1168   if (client->suspended > 0)
1169     return;
1170   if (client->in_process_client_buffer == GNUNET_YES)
1171     return;
1172   if (client->side_buf_size > 0)
1173     {
1174       /* resume processing from side-buf */
1175       sb = client->side_buf;
1176       client->side_buf = NULL;
1177       /* this will also resume the receive job */
1178       if (GNUNET_YES != client->shutdown_now)
1179         process_incoming (client, sb, client->side_buf_size, NULL, 0, 0);
1180       else
1181         shutdown_incoming_processing (client);
1182       /* finally, free the side-buf */
1183       GNUNET_free (sb);
1184       return;
1185     }
1186   /* resume receive job */
1187   if (GNUNET_YES != client->shutdown_now)
1188     {
1189       GNUNET_SCHEDULER_add_continuation (client->server->sched,
1190                                          GNUNET_NO,
1191                                          &restart_processing,
1192                                          client,
1193                                          GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1194       return;
1195     }
1196   shutdown_incoming_processing (client);
1197 }
1198
1199
1200 /* end of server.c */