handling replies continuously from server
[oweals/gnunet.git] / src / util / server.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file util/server.c
23  * @brief library for building GNUnet network servers
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_protocols.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
33
34 #define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util", syscall)
35
36 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
37
38
39 /**
40  * List of arrays of message handlers.
41  */
42 struct HandlerList
43 {
44   /**
45    * This is a linked list.
46    */
47   struct HandlerList *next;
48
49   /**
50    * NULL-terminated array of handlers.
51    */
52   const struct GNUNET_SERVER_MessageHandler *handlers;
53 };
54
55
56 /**
57  * List of arrays of message handlers.
58  */
59 struct NotifyList
60 {
61   /**
62    * This is a doubly linked list.
63    */
64   struct NotifyList *next;
65
66   /**
67    * This is a doubly linked list.
68    */
69   struct NotifyList *prev;
70
71   /**
72    * Function to call.
73    */
74   GNUNET_SERVER_DisconnectCallback callback;
75
76   /**
77    * Closure for callback.
78    */
79   void *callback_cls;
80 };
81
82
83 /**
84  * @brief handle for a server
85  */
86 struct GNUNET_SERVER_Handle
87 {
88   /**
89    * List of handlers for incoming messages.
90    */
91   struct HandlerList *handlers;
92
93   /**
94    * Head of list of our current clients.
95    */
96   struct GNUNET_SERVER_Client *clients_head;
97
98   /**
99    * Head of list of our current clients.
100    */
101   struct GNUNET_SERVER_Client *clients_tail;
102
103   /**
104    * Head of linked list of functions to call on disconnects by clients.
105    */
106   struct NotifyList *disconnect_notify_list_head;
107
108   /**
109    * Tail of linked list of functions to call on disconnects by clients.
110    */
111   struct NotifyList *disconnect_notify_list_tail;
112
113   /**
114    * Function to call for access control.
115    */
116   GNUNET_CONNECTION_AccessCheck access;
117
118   /**
119    * Closure for access.
120    */
121   void *access_cls;
122
123   /**
124    * NULL-terminated array of sockets used to listen for new
125    * connections.
126    */
127   struct GNUNET_NETWORK_Handle **listen_sockets;
128
129   /**
130    * After how long should an idle connection time
131    * out (on write).
132    */
133   struct GNUNET_TIME_Relative idle_timeout;
134
135   /**
136    * Task scheduled to do the listening.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier listen_task;
139
140   /**
141    * Alternative function to create a MST instance.
142    */
143   GNUNET_SERVER_MstCreateCallback mst_create;
144
145   /**
146    * Alternative function to destroy a MST instance.
147    */
148   GNUNET_SERVER_MstDestroyCallback mst_destroy;
149
150   /**
151    * Alternative function to give data to a MST instance.
152    */
153   GNUNET_SERVER_MstReceiveCallback mst_receive;
154
155   /**
156    * Closure for 'mst_'-callbacks.
157    */
158   void *mst_cls;
159
160   /**
161    * Do we ignore messages of types that we do not understand or do we
162    * require that a handler is found (and if not kill the connection)?
163    */
164   int require_found;
165
166   /**
167    * Set to GNUNET_YES once we are in 'soft' shutdown where we wait for
168    * all non-monitor clients to disconnect before we call
169    * GNUNET_SERVER_destroy.  See 'test_monitor_clients'.  Set to
170    * GNUNET_SYSERR once the final destroy task has been scheduled
171    * (we cannot run it in the same task).
172    */
173   int in_soft_shutdown;
174 };
175
176
177 /**
178  * Handle server returns for aborting transmission to a client.
179  */
180 struct GNUNET_SERVER_TransmitHandle
181 {
182   /**
183    * Function to call to get the message.
184    */
185   GNUNET_CONNECTION_TransmitReadyNotify callback;
186
187   /**
188    * Closure for 'callback'
189    */
190   void *callback_cls;
191
192   /**
193    * Active connection transmission handle.
194    */
195   struct GNUNET_CONNECTION_TransmitHandle *cth;
196
197 };
198
199
200 /**
201  * @brief handle for a client of the server
202  */
203 struct GNUNET_SERVER_Client
204 {
205
206   /**
207    * This is a doubly linked list.
208    */
209   struct GNUNET_SERVER_Client *next;
210
211   /**
212    * This is a doubly linked list.
213    */
214   struct GNUNET_SERVER_Client *prev;
215
216   /**
217    * Processing of incoming data.
218    */
219   void *mst;
220
221   /**
222    * Server that this client belongs to.
223    */
224   struct GNUNET_SERVER_Handle *server;
225
226   /**
227    * Client closure for callbacks.
228    */
229   struct GNUNET_CONNECTION_Handle *connection;
230
231   /**
232    * ID of task used to restart processing.
233    */
234   GNUNET_SCHEDULER_TaskIdentifier restart_task;
235
236   /**
237    * Task that warns about missing calls to 'GNUNET_SERVER_receive_done'.
238    */
239   GNUNET_SCHEDULER_TaskIdentifier warn_task;
240
241   /**
242    * Time when the warn task was started.
243    */
244   struct GNUNET_TIME_Absolute warn_start;
245
246   /**
247    * Last activity on this socket (used to time it out
248    * if reference_count == 0).
249    */
250   struct GNUNET_TIME_Absolute last_activity;
251
252   /**
253    * Transmission handle we return for this client from
254    * GNUNET_SERVER_notify_transmit_ready.
255    */
256   struct GNUNET_SERVER_TransmitHandle th;
257
258   /**
259    * After how long should an idle connection time
260    * out (on write).
261    */
262   struct GNUNET_TIME_Relative idle_timeout;
263
264   /**
265    * Number of external entities with a reference to
266    * this client object.
267    */
268   unsigned int reference_count;
269
270   /**
271    * Was processing if incoming messages suspended while
272    * we were still processing data already received?
273    * This is a counter saying how often processing was
274    * suspended (once per handler invoked).
275    */
276   unsigned int suspended;
277
278   /**
279    * Are we currently in the "process_client_buffer" function (and
280    * will hence restart the receive job on exit if suspended == 0 once
281    * we are done?).  If this is set, then "receive_done" will
282    * essentially only decrement suspended; if this is not set, then
283    * "receive_done" may need to restart the receive process (either
284    * from the side-buffer or via select/recv).
285    */
286   int in_process_client_buffer;
287
288   /**
289    * We're about to close down this client.
290    */
291   int shutdown_now;
292
293   /**
294    * Are we currently trying to receive? (YES if we are, NO if we are not,
295    * SYSERR if data is already available in MST).
296    */
297   int receive_pending;
298
299   /**
300    * Finish pending write when disconnecting?
301    */
302   int finish_pending_write;
303
304   /**
305    * Persist the file handle for this client no matter what happens,
306    * force the OS to close once the process actually dies.  Should only
307    * be used in special cases!
308    */
309   int persist;
310
311   /**
312    * Is this client a 'monitor' client that should not be counted
313    * when deciding on destroying the server during soft shutdown?
314    * (see also GNUNET_SERVICE_start)
315    */
316   int is_monitor;
317
318   /**
319    * Type of last message processed (for warn_no_receive_done).
320    */
321   uint16_t warn_type;
322 };
323
324
325 /**
326  * Scheduler says our listen socket is ready.  Process it!
327  *
328  * @param cls handle to our server for which we are processing the listen
329  *        socket
330  * @param tc reason why we are running right now
331  */
332 static void
333 process_listen_socket (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
334 {
335   struct GNUNET_SERVER_Handle *server = cls;
336   struct GNUNET_CONNECTION_Handle *sock;
337   struct GNUNET_SERVER_Client *client;
338   struct GNUNET_NETWORK_FDSet *r;
339   unsigned int i;
340
341   server->listen_task = GNUNET_SCHEDULER_NO_TASK;
342   r = GNUNET_NETWORK_fdset_create ();
343   i = 0;
344   while (NULL != server->listen_sockets[i])
345     GNUNET_NETWORK_fdset_set (r, server->listen_sockets[i++]);
346   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
347   {
348     /* ignore shutdown, someone else will take care of it! */
349     server->listen_task =
350         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_HIGH,
351                                      GNUNET_TIME_UNIT_FOREVER_REL, r, NULL,
352                                      &process_listen_socket, server);
353     GNUNET_NETWORK_fdset_destroy (r);
354     return;
355   }
356   i = 0;
357   while (NULL != server->listen_sockets[i])
358   {
359     if (GNUNET_NETWORK_fdset_isset (tc->read_ready, server->listen_sockets[i]))
360     {
361       sock =
362           GNUNET_CONNECTION_create_from_accept (server->access,
363                                                 server->access_cls,
364                                                 server->listen_sockets[i]);
365       if (NULL != sock)
366       {
367         LOG (GNUNET_ERROR_TYPE_DEBUG, "Server accepted incoming connection.\n");
368         client = GNUNET_SERVER_connect_socket (server, sock);
369         /* decrement reference count, we don't keep "client" alive */
370         GNUNET_SERVER_client_drop (client);
371       }
372     }
373     i++;
374   }
375   /* listen for more! */
376   server->listen_task =
377       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_HIGH,
378                                    GNUNET_TIME_UNIT_FOREVER_REL, r, NULL,
379                                    &process_listen_socket, server);
380   GNUNET_NETWORK_fdset_destroy (r);
381 }
382
383
384 /**
385  * Create and initialize a listen socket for the server.
386  *
387  * @param serverAddr address to listen on
388  * @param socklen length of address
389  * @return NULL on error, otherwise the listen socket
390  */
391 static struct GNUNET_NETWORK_Handle *
392 open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
393 {
394   static int on = 1;
395   struct GNUNET_NETWORK_Handle *sock;
396   uint16_t port;
397   int eno;
398
399   switch (serverAddr->sa_family)
400   {
401   case AF_INET:
402     port = ntohs (((const struct sockaddr_in *) serverAddr)->sin_port);
403     break;
404   case AF_INET6:
405     port = ntohs (((const struct sockaddr_in6 *) serverAddr)->sin6_port);
406     break;
407   case AF_UNIX:
408     port = 0;
409     break;
410   default:
411     GNUNET_break (0);
412     port = 0;
413     break;
414   }
415   sock = GNUNET_NETWORK_socket_create (serverAddr->sa_family, SOCK_STREAM, 0);
416   if (NULL == sock)
417   {
418     LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "socket");
419     errno = 0;
420     return NULL;
421   }
422   if (0 != port)
423   {
424     if (GNUNET_NETWORK_socket_setsockopt
425         (sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) != GNUNET_OK)
426       LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
427                     "setsockopt");
428 #ifdef IPV6_V6ONLY
429     if ((AF_INET6 == serverAddr->sa_family) &&
430         (GNUNET_NETWORK_socket_setsockopt
431          (sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof (on)) != GNUNET_OK))
432       LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
433                     "setsockopt");
434 #endif
435   }
436   /* bind the socket */
437   if (GNUNET_OK != GNUNET_NETWORK_socket_bind (sock, serverAddr, socklen))
438   {
439     eno = errno;
440     if (EADDRINUSE != errno)
441     {
442       /* we don't log 'EADDRINUSE' here since an IPv4 bind may
443        * fail if we already took the port on IPv6; if both IPv4 and
444        * IPv6 binds fail, then our caller will log using the
445        * errno preserved in 'eno' */
446       LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "bind");
447       if (0 != port)
448         LOG (GNUNET_ERROR_TYPE_ERROR, _("`%s' failed for port %d (%s).\n"),
449              "bind", port,
450              (AF_INET == serverAddr->sa_family) ? "IPv4" : "IPv6");
451       eno = 0;
452     }
453     else
454     {
455       if (0 != port)
456         LOG (GNUNET_ERROR_TYPE_WARNING,
457              _("`%s' failed for port %d (%s): address already in use\n"),
458              "bind", port,
459              (AF_INET == serverAddr->sa_family) ? "IPv4" : "IPv6");
460       else if (AF_UNIX == serverAddr->sa_family)
461         LOG (GNUNET_ERROR_TYPE_WARNING,
462              _("`%s' failed for `%s': address already in use\n"), "bind",
463              ((const struct sockaddr_un *) serverAddr)->sun_path);
464
465     }
466     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
467     errno = eno;
468     return NULL;
469   }
470   if (GNUNET_OK != GNUNET_NETWORK_socket_listen (sock, 5))
471   {
472     LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "listen");
473     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
474     errno = 0;
475     return NULL;
476   }
477   if (0 != port)
478     LOG (GNUNET_ERROR_TYPE_DEBUG, "Server starts to listen on port %u.\n",
479          port);
480   return sock;
481 }
482
483
484 /**
485  * Create a new server.
486  *
487  * @param access function for access control
488  * @param access_cls closure for access
489  * @param lsocks NULL-terminated array of listen sockets
490  * @param idle_timeout after how long should we timeout idle connections?
491  * @param require_found if YES, connections sending messages of unknown type
492  *        will be closed
493  * @return handle for the new server, NULL on error
494  *         (typically, "port" already in use)
495  */
496 struct GNUNET_SERVER_Handle *
497 GNUNET_SERVER_create_with_sockets (GNUNET_CONNECTION_AccessCheck access,
498                                    void *access_cls,
499                                    struct GNUNET_NETWORK_Handle **lsocks,
500                                    struct GNUNET_TIME_Relative idle_timeout,
501                                    int require_found)
502 {
503   struct GNUNET_SERVER_Handle *server;
504   struct GNUNET_NETWORK_FDSet *r;
505   int i;
506
507   server = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Handle));
508   server->idle_timeout = idle_timeout;
509   server->listen_sockets = lsocks;
510   server->access = access;
511   server->access_cls = access_cls;
512   server->require_found = require_found;
513   if (NULL != lsocks)
514   {
515     r = GNUNET_NETWORK_fdset_create ();
516     i = 0;
517     while (NULL != server->listen_sockets[i])
518       GNUNET_NETWORK_fdset_set (r, server->listen_sockets[i++]);
519     server->listen_task =
520         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_HIGH,
521                                      GNUNET_TIME_UNIT_FOREVER_REL, r, NULL,
522                                      &process_listen_socket, server);
523     GNUNET_NETWORK_fdset_destroy (r);
524   }
525   return server;
526 }
527
528
529 /**
530  * Create a new server.
531  *
532  * @param access function for access control
533  * @param access_cls closure for access
534  * @param serverAddr address to listen on (including port), NULL terminated array
535  * @param socklen length of serverAddr
536  * @param idle_timeout after how long should we timeout idle connections?
537  * @param require_found if YES, connections sending messages of unknown type
538  *        will be closed
539  * @return handle for the new server, NULL on error
540  *         (typically, "port" already in use)
541  */
542 struct GNUNET_SERVER_Handle *
543 GNUNET_SERVER_create (GNUNET_CONNECTION_AccessCheck access, void *access_cls,
544                       struct sockaddr *const *serverAddr,
545                       const socklen_t * socklen,
546                       struct GNUNET_TIME_Relative idle_timeout,
547                       int require_found)
548 {
549   struct GNUNET_NETWORK_Handle **lsocks;
550   unsigned int i;
551   unsigned int j;
552   unsigned int k;
553   int seen;
554
555   i = 0;
556   while (NULL != serverAddr[i])
557     i++;
558   if (i > 0)
559   {
560     lsocks = GNUNET_malloc (sizeof (struct GNUNET_NETWORK_Handle *) * (i + 1));
561     i = 0;
562     j = 0;
563     while (NULL != serverAddr[i])
564     {
565       seen = 0;
566       for (k=0;k<i;k++)
567         if ( (socklen[k] == socklen[i]) &&
568              (0 == memcmp (serverAddr[k], serverAddr[i], socklen[i])) )
569         {
570           seen = 1;
571           break;
572         }
573       if (0 != seen)
574       {
575         /* duplicate address, skip */
576         i++;
577         continue;
578       }
579       lsocks[j] = open_listen_socket (serverAddr[i], socklen[i]);
580       if (NULL != lsocks[j])
581         j++;
582       i++;
583     }
584     if (0 == j)
585     {
586       if (0 != errno)
587         LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "bind");
588       GNUNET_free (lsocks);
589       lsocks = NULL;
590     }
591   }
592   else
593   {
594     lsocks = NULL;
595   }
596   return GNUNET_SERVER_create_with_sockets (access, access_cls, lsocks,
597                                             idle_timeout, require_found);
598 }
599
600
601 /**
602  * Set the 'monitor' flag on this client.  Clients which have been
603  * marked as 'monitors' won't prevent the server from shutting down
604  * once 'GNUNET_SERVER_stop_listening' has been invoked.  The idea is
605  * that for "normal" clients we likely want to allow them to process
606  * their requests; however, monitor-clients are likely to 'never'
607  * disconnect during shutdown and thus will not be considered when
608  * determining if the server should continue to exist after
609  * 'GNUNET_SERVER_destroy' has been called.
610  *
611  * @param client the client to set the 'monitor' flag on
612  */
613 void
614 GNUNET_SERVER_client_mark_monitor (struct GNUNET_SERVER_Client *client)
615 {
616   client->is_monitor = GNUNET_YES;
617 }
618
619
620 /**
621  * Helper function for 'test_monitor_clients' to trigger
622  * 'GNUNET_SERVER_destroy' after the stack has unwound.
623  *
624  * @param cls the 'struct GNUNET_SERVER_Handle' to destroy
625  * @param tc unused
626  */
627 static void
628 do_destroy (void *cls,
629             const struct GNUNET_SCHEDULER_TaskContext *tc)
630 {
631   struct GNUNET_SERVER_Handle *server = cls;
632   GNUNET_SERVER_destroy (server);
633 }
634
635
636 /**
637  * Check if only 'monitor' clients are left.  If so, destroy the
638  * server completely.
639  *
640  * @param server server to test for full shutdown
641  */
642 static void
643 test_monitor_clients (struct GNUNET_SERVER_Handle *server)
644 {
645   struct GNUNET_SERVER_Client *client;
646
647   if (GNUNET_YES != server->in_soft_shutdown)
648     return;
649   for (client = server->clients_head; NULL != client; client = client->next)
650     if (GNUNET_NO == client->is_monitor)
651       return; /* not done yet */
652   server->in_soft_shutdown = GNUNET_SYSERR;
653   GNUNET_SCHEDULER_add_continuation (&do_destroy, server,
654                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
655 }
656
657
658 /**
659  * Stop the listen socket and get ready to shutdown the server
660  * once only 'monitor' clients are left.
661  *
662  * @param server server to stop listening on
663  */
664 void
665 GNUNET_SERVER_stop_listening (struct GNUNET_SERVER_Handle *server)
666 {
667   unsigned int i;
668
669   LOG (GNUNET_ERROR_TYPE_DEBUG, "Server in soft shutdown\n");
670   if (GNUNET_SCHEDULER_NO_TASK != server->listen_task)
671   {
672     GNUNET_SCHEDULER_cancel (server->listen_task);
673     server->listen_task = GNUNET_SCHEDULER_NO_TASK;
674   }
675   if (NULL != server->listen_sockets)
676   {
677     i = 0;
678     while (NULL != server->listen_sockets[i])
679       GNUNET_break (GNUNET_OK ==
680                     GNUNET_NETWORK_socket_close (server->listen_sockets[i++]));
681     GNUNET_free (server->listen_sockets);
682     server->listen_sockets = NULL;
683   }
684   if (GNUNET_NO == server->in_soft_shutdown)
685     server->in_soft_shutdown = GNUNET_YES;
686   test_monitor_clients (server);
687 }
688
689
690 /**
691  * Free resources held by this server.
692  *
693  * @param server server to destroy
694  */
695 void
696 GNUNET_SERVER_destroy (struct GNUNET_SERVER_Handle *server)
697 {
698   struct HandlerList *hpos;
699   struct NotifyList *npos;
700   unsigned int i;
701
702   LOG (GNUNET_ERROR_TYPE_DEBUG, "Server shutting down.\n");
703   if (GNUNET_SCHEDULER_NO_TASK != server->listen_task)
704   {
705     GNUNET_SCHEDULER_cancel (server->listen_task);
706     server->listen_task = GNUNET_SCHEDULER_NO_TASK;
707   }
708   if (NULL != server->listen_sockets)
709   {
710     i = 0;
711     while (NULL != server->listen_sockets[i])
712       GNUNET_break (GNUNET_OK ==
713                     GNUNET_NETWORK_socket_close (server->listen_sockets[i++]));
714     GNUNET_free (server->listen_sockets);
715     server->listen_sockets = NULL;
716   }
717   while (NULL != server->clients_head)
718     GNUNET_SERVER_client_disconnect (server->clients_head);
719   while (NULL != (hpos = server->handlers))
720   {
721     server->handlers = hpos->next;
722     GNUNET_free (hpos);
723   }
724   while (NULL != (npos = server->disconnect_notify_list_head))
725   {
726     npos->callback (npos->callback_cls, NULL);
727     GNUNET_CONTAINER_DLL_remove (server->disconnect_notify_list_head,
728                                  server->disconnect_notify_list_tail,
729                                  npos);
730     GNUNET_free (npos);
731   }
732   GNUNET_free (server);
733 }
734
735
736 /**
737  * Add additional handlers to an existing server.
738  *
739  * @param server the server to add handlers to
740  * @param handlers array of message handlers for
741  *        incoming messages; the last entry must
742  *        have "NULL" for the "callback"; multiple
743  *        entries for the same type are allowed,
744  *        they will be called in order of occurence.
745  *        These handlers can be removed later;
746  *        the handlers array must exist until removed
747  *        (or server is destroyed).
748  */
749 void
750 GNUNET_SERVER_add_handlers (struct GNUNET_SERVER_Handle *server,
751                             const struct GNUNET_SERVER_MessageHandler *handlers)
752 {
753   struct HandlerList *p;
754
755   p = GNUNET_malloc (sizeof (struct HandlerList));
756   p->handlers = handlers;
757   p->next = server->handlers;
758   server->handlers = p;
759 }
760
761
762 /**
763  * Change functions used by the server to tokenize the message stream.
764  * (very rarely used).
765  *
766  * @param server server to modify
767  * @param create new tokenizer initialization function
768  * @param destroy new tokenizer destruction function
769  * @param receive new tokenizer receive function
770  * @param cls closure for 'create', 'receive', 'destroy' 
771  */
772 void
773 GNUNET_SERVER_set_callbacks (struct GNUNET_SERVER_Handle *server,
774                              GNUNET_SERVER_MstCreateCallback create,
775                              GNUNET_SERVER_MstDestroyCallback destroy,
776                              GNUNET_SERVER_MstReceiveCallback receive,
777                              void *cls)
778 {
779   server->mst_create = create;
780   server->mst_destroy = destroy;
781   server->mst_receive = receive;
782   server->mst_cls = cls;
783 }
784
785
786 /**
787  * Task run to warn about missing calls to 'GNUNET_SERVER_receive_done'.
788  *
789  * @param cls our 'struct GNUNET_SERVER_Client*' to process more requests from
790  * @param tc scheduler context (unused)
791  */
792 static void
793 warn_no_receive_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
794 {
795   struct GNUNET_SERVER_Client *client = cls;
796
797   client->warn_task =
798       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
799                                     &warn_no_receive_done, client);
800   if (0 == (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
801     LOG (GNUNET_ERROR_TYPE_WARNING,
802          _
803          ("Processing code for message of type %u did not call GNUNET_SERVER_receive_done after %llums\n"),
804          (unsigned int) client->warn_type,
805          (unsigned long long)
806          GNUNET_TIME_absolute_get_duration (client->warn_start).rel_value);
807 }
808
809
810 /**
811  * Disable the warning the server issues if a message is not acknowledged
812  * in a timely fashion.  Use this call if a client is intentionally delayed
813  * for a while.  Only applies to the current message.
814  *
815  * @param client client for which to disable the warning
816  */
817 void
818 GNUNET_SERVER_disable_receive_done_warning (struct GNUNET_SERVER_Client *client)
819 {
820   if (GNUNET_SCHEDULER_NO_TASK != client->warn_task)
821   {
822     GNUNET_SCHEDULER_cancel (client->warn_task);
823     client->warn_task = GNUNET_SCHEDULER_NO_TASK;
824   }
825 }
826
827
828 /**
829  * Inject a message into the server, pretend it came
830  * from the specified client.  Delivery of the message
831  * will happen instantly (if a handler is installed;
832  * otherwise the call does nothing).
833  *
834  * @param server the server receiving the message
835  * @param sender the "pretended" sender of the message
836  *        can be NULL!
837  * @param message message to transmit
838  * @return GNUNET_OK if the message was OK and the
839  *                   connection can stay open
840  *         GNUNET_SYSERR if the connection to the
841  *         client should be shut down
842  */
843 int
844 GNUNET_SERVER_inject (struct GNUNET_SERVER_Handle *server,
845                       struct GNUNET_SERVER_Client *sender,
846                       const struct GNUNET_MessageHeader *message)
847 {
848   struct HandlerList *pos;
849   const struct GNUNET_SERVER_MessageHandler *mh;
850   unsigned int i;
851   uint16_t type;
852   uint16_t size;
853   int found;
854
855   type = ntohs (message->type);
856   size = ntohs (message->size);
857   LOG (GNUNET_ERROR_TYPE_DEBUG,
858        "Server schedules transmission of %u-byte message of type %u to client.\n",
859        size, type);
860   found = GNUNET_NO;
861   for (pos = server->handlers; NULL != pos; pos = pos->next)
862   {
863     i = 0;
864     while (pos->handlers[i].callback != NULL)
865     {
866       mh = &pos->handlers[i];
867       if ((mh->type == type) || (mh->type == GNUNET_MESSAGE_TYPE_ALL))
868       {
869         if ((0 != mh->expected_size) && (mh->expected_size != size))
870         {
871 #if GNUNET8_NETWORK_IS_DEAD
872           LOG (GNUNET_ERROR_TYPE_WARNING,
873                "Expected %u bytes for message of type %u, got %u\n",
874                mh->expected_size, mh->type, size);
875           GNUNET_break_op (0);
876 #endif
877           return GNUNET_SYSERR;
878         }
879         if (NULL != sender)
880         {
881           if (0 == sender->suspended)
882           {
883             sender->warn_start = GNUNET_TIME_absolute_get ();
884             sender->warn_task =
885                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
886                                               &warn_no_receive_done, sender);
887             sender->warn_type = type;
888           }
889           sender->suspended++;
890         }
891         mh->callback (mh->callback_cls, sender, message);
892         found = GNUNET_YES;
893       }
894       i++;
895     }
896   }
897   if (GNUNET_NO == found)
898   {
899     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
900          "Received message of unknown type %d\n", type);
901     if (GNUNET_YES == server->require_found)
902       return GNUNET_SYSERR;
903   }
904   return GNUNET_OK;
905 }
906
907
908 /**
909  * We are receiving an incoming message.  Process it.
910  *
911  * @param cls our closure (handle for the client)
912  * @param buf buffer with data received from network
913  * @param available number of bytes available in buf
914  * @param addr address of the sender
915  * @param addrlen length of addr
916  * @param errCode code indicating errors receiving, 0 for success
917  */
918 static void
919 process_incoming (void *cls, const void *buf, size_t available,
920                   const struct sockaddr *addr, socklen_t addrlen, int errCode);
921
922
923 /**
924  * Process messages from the client's message tokenizer until either
925  * the tokenizer is empty (and then schedule receiving more), or
926  * until some handler is not immediately done (then wait for restart_processing)
927  * or shutdown.
928  *
929  * @param client the client to process, RC must have already been increased
930  *        using GNUNET_SERVER_client_keep and will be decreased by one in this
931  *        function
932  * @param ret GNUNET_NO to start processing from the buffer,
933  *            GNUNET_OK if the mst buffer is drained and we should instantly go back to receiving
934  *            GNUNET_SYSERR if we should instantly abort due to error in a previous step
935  */
936 static void
937 process_mst (struct GNUNET_SERVER_Client *client, int ret)
938 {
939   while ((GNUNET_SYSERR != ret) && (NULL != client->server) &&
940          (GNUNET_YES != client->shutdown_now) && (0 == client->suspended))
941   {
942     if (GNUNET_OK == ret)
943     {
944       LOG (GNUNET_ERROR_TYPE_DEBUG,
945            "Server re-enters receive loop, timeout: %llu.\n",
946            client->idle_timeout.rel_value);
947       client->receive_pending = GNUNET_YES;
948       GNUNET_CONNECTION_receive (client->connection,
949                                  GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
950                                  client->idle_timeout, &process_incoming,
951                                  client);
952       break;
953     }
954     LOG (GNUNET_ERROR_TYPE_DEBUG,
955          "Server processes additional messages instantly.\n");
956     if (NULL != client->server->mst_receive)
957       ret =
958           client->server->mst_receive (client->server->mst_cls, client->mst,
959                                        client, NULL, 0, GNUNET_NO, GNUNET_YES);
960     else
961       ret =
962           GNUNET_SERVER_mst_receive (client->mst, client, NULL, 0, GNUNET_NO,
963                                      GNUNET_YES);
964   }
965   LOG (GNUNET_ERROR_TYPE_DEBUG,
966        "Server leaves instant processing loop: ret = %d, server = %p, shutdown = %d, suspended = %u\n",
967        ret, client->server, client->shutdown_now, client->suspended);
968   if (GNUNET_NO == ret)
969   {
970     LOG (GNUNET_ERROR_TYPE_DEBUG,
971          "Server has more data pending but is suspended.\n");
972     client->receive_pending = GNUNET_SYSERR;    /* data pending */
973   }
974   if ((GNUNET_SYSERR == ret) || (GNUNET_YES == client->shutdown_now))
975     GNUNET_SERVER_client_disconnect (client);
976 }
977
978
979 /**
980  * We are receiving an incoming message.  Process it.
981  *
982  * @param cls our closure (handle for the client)
983  * @param buf buffer with data received from network
984  * @param available number of bytes available in buf
985  * @param addr address of the sender
986  * @param addrlen length of addr
987  * @param errCode code indicating errors receiving, 0 for success
988  */
989 static void
990 process_incoming (void *cls, const void *buf, size_t available,
991                   const struct sockaddr *addr, socklen_t addrlen, int errCode)
992 {
993   struct GNUNET_SERVER_Client *client = cls;
994   struct GNUNET_SERVER_Handle *server = client->server;
995   struct GNUNET_TIME_Absolute end;
996   struct GNUNET_TIME_Absolute now;
997   int ret;
998
999   GNUNET_assert (GNUNET_YES == client->receive_pending);
1000   client->receive_pending = GNUNET_NO;
1001   now = GNUNET_TIME_absolute_get ();
1002   end = GNUNET_TIME_absolute_add (client->last_activity, client->idle_timeout);
1003
1004   if ((NULL == buf) && (0 == available) && (NULL == addr) && (0 == errCode) &&
1005       (GNUNET_YES != client->shutdown_now) && (NULL != server) &&
1006       (GNUNET_YES == GNUNET_CONNECTION_check (client->connection)) &&
1007       (end.abs_value > now.abs_value))
1008   {
1009     /* wait longer, timeout changed (i.e. due to us sending) */
1010     LOG (GNUNET_ERROR_TYPE_DEBUG,
1011          "Receive time out, but no disconnect due to sending (%p)\n",
1012          GNUNET_a2s (addr, addrlen));
1013     client->receive_pending = GNUNET_YES;
1014     GNUNET_CONNECTION_receive (client->connection,
1015                                GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
1016                                GNUNET_TIME_absolute_get_remaining (end),
1017                                &process_incoming, client);
1018     return;
1019   }
1020   if ((NULL == buf) || (0 == available) || (0 != errCode) || (NULL == server) ||
1021       (GNUNET_YES == client->shutdown_now) ||
1022       (GNUNET_YES != GNUNET_CONNECTION_check (client->connection)))
1023   {
1024     /* other side closed connection, error connecting, etc. */
1025     GNUNET_SERVER_client_disconnect (client);
1026     return;
1027   }
1028   LOG (GNUNET_ERROR_TYPE_DEBUG, "Server receives %u bytes from `%s'.\n",
1029        (unsigned int) available, GNUNET_a2s (addr, addrlen));
1030   GNUNET_SERVER_client_keep (client);
1031   client->last_activity = now;
1032
1033   if (NULL != server->mst_receive)
1034     ret =
1035         client->server->mst_receive (client->server->mst_cls, client->mst,
1036                                      client, buf, available, GNUNET_NO, GNUNET_YES);
1037   else
1038     ret =
1039         GNUNET_SERVER_mst_receive (client->mst, client, buf, available, GNUNET_NO,
1040                                    GNUNET_YES);
1041   process_mst (client, ret);
1042   GNUNET_SERVER_client_drop (client);
1043 }
1044
1045
1046 /**
1047  * Task run to start again receiving from the network
1048  * and process requests.
1049  *
1050  * @param cls our 'struct GNUNET_SERVER_Client*' to process more requests from
1051  * @param tc scheduler context (unused)
1052  */
1053 static void
1054 restart_processing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1055 {
1056   struct GNUNET_SERVER_Client *client = cls;
1057
1058   GNUNET_assert (GNUNET_YES != client->shutdown_now);
1059   client->restart_task = GNUNET_SCHEDULER_NO_TASK;
1060   if (GNUNET_NO == client->receive_pending)
1061   {
1062     LOG (GNUNET_ERROR_TYPE_DEBUG, "Server begins to read again from client.\n");
1063     client->receive_pending = GNUNET_YES;
1064     GNUNET_CONNECTION_receive (client->connection,
1065                                GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
1066                                client->idle_timeout, &process_incoming, client);
1067     return;
1068   }
1069   LOG (GNUNET_ERROR_TYPE_DEBUG,
1070        "Server continues processing messages still in the buffer.\n");
1071   GNUNET_SERVER_client_keep (client);
1072   client->receive_pending = GNUNET_NO;
1073   process_mst (client, GNUNET_NO);
1074   GNUNET_SERVER_client_drop (client);
1075 }
1076
1077
1078 /**
1079  * This function is called whenever our inbound message tokenizer has
1080  * received a complete message.
1081  *
1082  * @param cls closure (struct GNUNET_SERVER_Handle)
1083  * @param client identification of the client (struct GNUNET_SERVER_Client*)
1084  * @param message the actual message
1085  */
1086 static void
1087 client_message_tokenizer_callback (void *cls, void *client,
1088                                    const struct GNUNET_MessageHeader *message)
1089 {
1090   struct GNUNET_SERVER_Handle *server = cls;
1091   struct GNUNET_SERVER_Client *sender = client;
1092   int ret;
1093
1094   LOG (GNUNET_ERROR_TYPE_DEBUG,
1095        "Tokenizer gives server message of type %u from client\n",
1096        ntohs (message->type));
1097   sender->in_process_client_buffer = GNUNET_YES;
1098   ret = GNUNET_SERVER_inject (server, sender, message);
1099   sender->in_process_client_buffer = GNUNET_NO;
1100   if ( (GNUNET_OK != ret) || (GNUNET_YES == sender->shutdown_now) )
1101     GNUNET_SERVER_client_disconnect (sender);
1102 }
1103
1104
1105 /**
1106  * Add a TCP socket-based connection to the set of handles managed by
1107  * this server.  Use this function for outgoing (P2P) connections that
1108  * we initiated (and where this server should process incoming
1109  * messages).
1110  *
1111  * @param server the server to use
1112  * @param connection the connection to manage (client must
1113  *        stop using this connection from now on)
1114  * @return the client handle (client should call
1115  *         "client_drop" on the return value eventually)
1116  */
1117 struct GNUNET_SERVER_Client *
1118 GNUNET_SERVER_connect_socket (struct GNUNET_SERVER_Handle *server,
1119                               struct GNUNET_CONNECTION_Handle *connection)
1120 {
1121   struct GNUNET_SERVER_Client *client;
1122
1123   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
1124   client->connection = connection;
1125   client->reference_count = 1;
1126   client->server = server;
1127   client->last_activity = GNUNET_TIME_absolute_get ();
1128   client->idle_timeout = server->idle_timeout;
1129   GNUNET_CONTAINER_DLL_insert (server->clients_head,
1130                                server->clients_tail,
1131                                client);
1132   if (NULL != server->mst_create)
1133     client->mst =
1134         server->mst_create (server->mst_cls, client);
1135   else
1136     client->mst =
1137         GNUNET_SERVER_mst_create (&client_message_tokenizer_callback, server);
1138   GNUNET_assert (NULL != client->mst);
1139   client->receive_pending = GNUNET_YES;
1140   GNUNET_CONNECTION_receive (client->connection,
1141                              GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
1142                              client->idle_timeout, &process_incoming, client);
1143   return client;
1144 }
1145
1146
1147 /**
1148  * Change the timeout for a particular client.  Decreasing the timeout
1149  * may not go into effect immediately (only after the previous timeout
1150  * times out or activity happens on the socket).
1151  *
1152  * @param client the client to update
1153  * @param timeout new timeout for activities on the socket
1154  */
1155 void
1156 GNUNET_SERVER_client_set_timeout (struct GNUNET_SERVER_Client *client,
1157                                   struct GNUNET_TIME_Relative timeout)
1158 {
1159   client->idle_timeout = timeout;
1160 }
1161
1162
1163 /**
1164  * Notify the server that the given client handle should
1165  * be kept (keeps the connection up if possible, increments
1166  * the internal reference counter).
1167  *
1168  * @param client the client to keep
1169  */
1170 void
1171 GNUNET_SERVER_client_keep (struct GNUNET_SERVER_Client *client)
1172 {
1173   client->reference_count++;
1174 }
1175
1176
1177 /**
1178  * Notify the server that the given client handle is no
1179  * longer required.  Decrements the reference counter.  If
1180  * that counter reaches zero an inactive connection maybe
1181  * closed.
1182  *
1183  * @param client the client to drop
1184  */
1185 void
1186 GNUNET_SERVER_client_drop (struct GNUNET_SERVER_Client *client)
1187 {
1188   GNUNET_assert (client->reference_count > 0);
1189   client->reference_count--;
1190   if ((GNUNET_YES == client->shutdown_now) && (0 == client->reference_count))
1191     GNUNET_SERVER_client_disconnect (client);
1192 }
1193
1194
1195 /**
1196  * Obtain the network address of the other party.
1197  *
1198  * @param client the client to get the address for
1199  * @param addr where to store the address
1200  * @param addrlen where to store the length of the address
1201  * @return GNUNET_OK on success
1202  */
1203 int
1204 GNUNET_SERVER_client_get_address (struct GNUNET_SERVER_Client *client,
1205                                   void **addr, size_t * addrlen)
1206 {
1207   return GNUNET_CONNECTION_get_address (client->connection, addr, addrlen);
1208 }
1209
1210
1211 /**
1212  * Ask the server to notify us whenever a client disconnects.
1213  * This function is called whenever the actual network connection
1214  * is closed; the reference count may be zero or larger than zero
1215  * at this point.
1216  *
1217  * @param server the server manageing the clients
1218  * @param callback function to call on disconnect
1219  * @param callback_cls closure for callback
1220  */
1221 void
1222 GNUNET_SERVER_disconnect_notify (struct GNUNET_SERVER_Handle *server,
1223                                  GNUNET_SERVER_DisconnectCallback callback,
1224                                  void *callback_cls)
1225 {
1226   struct NotifyList *n;
1227
1228   n = GNUNET_malloc (sizeof (struct NotifyList));
1229   n->callback = callback;
1230   n->callback_cls = callback_cls;
1231   GNUNET_CONTAINER_DLL_insert (server->disconnect_notify_list_head,
1232                                server->disconnect_notify_list_tail,
1233                                n);
1234 }
1235
1236
1237 /**
1238  * Ask the server to stop notifying us whenever a client disconnects.
1239  *
1240  * @param server the server manageing the clients
1241  * @param callback function to call on disconnect
1242  * @param callback_cls closure for callback
1243  */
1244 void
1245 GNUNET_SERVER_disconnect_notify_cancel (struct GNUNET_SERVER_Handle *server,
1246                                         GNUNET_SERVER_DisconnectCallback
1247                                         callback, void *callback_cls)
1248 {
1249   struct NotifyList *pos;
1250
1251   for (pos = server->disconnect_notify_list_head; NULL != pos; pos = pos->next)
1252     if ((pos->callback == callback) && (pos->callback_cls == callback_cls))
1253       break;
1254   if (NULL == pos)
1255   {
1256     GNUNET_break (0);
1257     return;
1258   }
1259   GNUNET_CONTAINER_DLL_remove (server->disconnect_notify_list_head,
1260                                server->disconnect_notify_list_tail,
1261                                pos);
1262   GNUNET_free (pos);
1263 }
1264
1265
1266 /**
1267  * Destroy the connection that is passed in via 'cls'.  Used
1268  * as calling 'GNUNET_CONNECTION_destroy' from within a function
1269  * that was itself called from within 'process_notify' of
1270  * 'connection.c' is not allowed (see #2329).
1271  *
1272  * @param cls connection to destroy
1273  * @param tc scheduler context (unused)
1274  */
1275 static void
1276 destroy_connection (void *cls,
1277                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1278 {
1279   struct GNUNET_CONNECTION_Handle *connection = cls;
1280   
1281   GNUNET_CONNECTION_destroy (connection);
1282 }
1283
1284
1285 /**
1286  * Ask the server to disconnect from the given client.
1287  * This is the same as returning GNUNET_SYSERR from a message
1288  * handler, except that it allows dropping of a client even
1289  * when not handling a message from that client.
1290  *
1291  * @param client the client to disconnect from
1292  */
1293 void
1294 GNUNET_SERVER_client_disconnect (struct GNUNET_SERVER_Client *client)
1295 {
1296   struct GNUNET_SERVER_Handle *server = client->server;
1297   struct NotifyList *n;
1298
1299   LOG (GNUNET_ERROR_TYPE_DEBUG,
1300        "Client is being disconnected from the server.\n");
1301   if (GNUNET_SCHEDULER_NO_TASK != client->restart_task)
1302   {
1303     GNUNET_SCHEDULER_cancel (client->restart_task);
1304     client->restart_task = GNUNET_SCHEDULER_NO_TASK;
1305   }
1306   if (GNUNET_SCHEDULER_NO_TASK != client->warn_task)
1307   {
1308     GNUNET_SCHEDULER_cancel (client->warn_task);
1309     client->warn_task = GNUNET_SCHEDULER_NO_TASK;
1310   }
1311   if (GNUNET_YES == client->receive_pending)
1312   {
1313     GNUNET_CONNECTION_receive_cancel (client->connection);
1314     client->receive_pending = GNUNET_NO;
1315   }
1316   client->shutdown_now = GNUNET_YES;    
1317   client->reference_count++; /* make sure nobody else clean up client... */
1318   if ( (NULL != client->mst) &&
1319        (NULL != server) )
1320   {
1321     GNUNET_CONTAINER_DLL_remove (server->clients_head,
1322                                  server->clients_tail,
1323                                  client);
1324     if (GNUNET_SCHEDULER_NO_TASK != client->restart_task)
1325     {
1326       GNUNET_SCHEDULER_cancel (client->restart_task);
1327       client->restart_task = GNUNET_SCHEDULER_NO_TASK;
1328     }
1329     if (GNUNET_SCHEDULER_NO_TASK != client->warn_task)
1330     {
1331       GNUNET_SCHEDULER_cancel (client->warn_task);
1332       client->warn_task = GNUNET_SCHEDULER_NO_TASK;
1333     }
1334     if (NULL != server->mst_destroy)
1335       server->mst_destroy (server->mst_cls, client->mst);
1336     else
1337       GNUNET_SERVER_mst_destroy (client->mst);
1338     client->mst = NULL;
1339     for (n = server->disconnect_notify_list_head; NULL != n; n = n->next)
1340       n->callback (n->callback_cls, client);
1341   }
1342   client->reference_count--;
1343   if (client->reference_count > 0)
1344   {
1345     LOG (GNUNET_ERROR_TYPE_DEBUG,
1346          "RC still positive, not destroying everything.\n");
1347     client->server = NULL;
1348     return;
1349   }
1350   if (GNUNET_YES == client->in_process_client_buffer)
1351   {
1352     LOG (GNUNET_ERROR_TYPE_DEBUG,
1353          "Still processing inputs, not destroying everything.\n");
1354     return;
1355   }
1356   if (GNUNET_YES == client->persist)
1357     GNUNET_CONNECTION_persist_ (client->connection);
1358   if (NULL != client->th.cth)
1359     GNUNET_SERVER_notify_transmit_ready_cancel (&client->th);
1360   (void) GNUNET_SCHEDULER_add_now (&destroy_connection,
1361                                    client->connection);
1362   GNUNET_free (client);
1363   /* we might be in soft-shutdown, test if we're done */
1364   if (NULL != server)
1365     test_monitor_clients (server);
1366 }
1367
1368
1369 /**
1370  * Disable the "CORK" feature for communication with the given client,
1371  * forcing the OS to immediately flush the buffer on transmission
1372  * instead of potentially buffering multiple messages.
1373  *
1374  * @param client handle to the client
1375  * @return GNUNET_OK on success
1376  */
1377 int
1378 GNUNET_SERVER_client_disable_corking (struct GNUNET_SERVER_Client *client)
1379 {
1380   return GNUNET_CONNECTION_disable_corking (client->connection);
1381 }
1382
1383
1384 /**
1385  * Wrapper for transmission notification that calls the original
1386  * callback and update the last activity time for our connection.
1387  *
1388  * @param cls the 'struct GNUNET_SERVER_Client'
1389  * @param size number of bytes we can transmit
1390  * @param buf where to copy the message
1391  * @return number of bytes actually transmitted
1392  */
1393 static size_t
1394 transmit_ready_callback_wrapper (void *cls, size_t size, void *buf)
1395 {
1396   struct GNUNET_SERVER_Client *client = cls;
1397   GNUNET_CONNECTION_TransmitReadyNotify callback;
1398
1399   client->th.cth = NULL;
1400   callback = client->th.callback;
1401   client->th.callback = NULL;
1402   client->last_activity = GNUNET_TIME_absolute_get ();
1403   return callback (client->th.callback_cls, size, buf);
1404 }
1405
1406
1407 /**
1408  * Notify us when the server has enough space to transmit
1409  * a message of the given size to the given client.
1410  *
1411  * @param client client to transmit message to
1412  * @param size requested amount of buffer space
1413  * @param timeout after how long should we give up (and call
1414  *        notify with buf NULL and size 0)?
1415  * @param callback function to call when space is available
1416  * @param callback_cls closure for callback
1417  * @return non-NULL if the notify callback was queued; can be used
1418  *           to cancel the request using
1419  *           GNUNET_SERVER_notify_transmit_ready_cancel.
1420  *         NULL if we are already going to notify someone else (busy)
1421  */
1422 struct GNUNET_SERVER_TransmitHandle *
1423 GNUNET_SERVER_notify_transmit_ready (struct GNUNET_SERVER_Client *client,
1424                                      size_t size,
1425                                      struct GNUNET_TIME_Relative timeout,
1426                                      GNUNET_CONNECTION_TransmitReadyNotify
1427                                      callback, void *callback_cls)
1428 {
1429   if (NULL != client->th.callback)
1430     return NULL;
1431   client->th.callback_cls = callback_cls;
1432   client->th.callback = callback;
1433   client->th.cth = GNUNET_CONNECTION_notify_transmit_ready (client->connection, size,
1434                                                             timeout,
1435                                                             &transmit_ready_callback_wrapper,
1436                                                             client);
1437   return &client->th;
1438 }
1439
1440
1441 /**
1442  * Abort transmission request.
1443  *
1444  * @param th request to abort
1445  */
1446 void
1447 GNUNET_SERVER_notify_transmit_ready_cancel (struct GNUNET_SERVER_TransmitHandle *th)
1448 {
1449   GNUNET_CONNECTION_notify_transmit_ready_cancel (th->cth);
1450   th->cth = NULL;
1451   th->callback = NULL;
1452 }
1453
1454
1455 /**
1456  * Set the persistent flag on this client, used to setup client connection
1457  * to only be killed when the service it's connected to is actually dead.
1458  *
1459  * @param client the client to set the persistent flag on
1460  */
1461 void
1462 GNUNET_SERVER_client_persist_ (struct GNUNET_SERVER_Client *client)
1463 {
1464   client->persist = GNUNET_YES;
1465 }
1466
1467
1468 /**
1469  * Resume receiving from this client, we are done processing the
1470  * current request.  This function must be called from within each
1471  * GNUNET_SERVER_MessageCallback (or its respective continuations).
1472  *
1473  * @param client client we were processing a message of
1474  * @param success GNUNET_OK to keep the connection open and
1475  *                          continue to receive
1476  *                GNUNET_NO to close the connection (normal behavior)
1477  *                GNUNET_SYSERR to close the connection (signal
1478  *                          serious error)
1479  */
1480 void
1481 GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int success)
1482 {
1483   if (NULL == client)
1484     return;
1485   GNUNET_assert (client->suspended > 0);
1486   client->suspended--;
1487   if (GNUNET_OK != success)
1488   {
1489     LOG (GNUNET_ERROR_TYPE_DEBUG,
1490          "GNUNET_SERVER_receive_done called with failure indication\n");
1491     if ( (client->reference_count > 0) || (client->suspended > 0) )
1492       client->shutdown_now = GNUNET_YES;
1493     else
1494       GNUNET_SERVER_client_disconnect (client);
1495     return;
1496   }
1497   if (client->suspended > 0)
1498   {
1499     LOG (GNUNET_ERROR_TYPE_DEBUG,
1500          "GNUNET_SERVER_receive_done called, but more clients pending\n");
1501     return;
1502   }
1503   if (GNUNET_SCHEDULER_NO_TASK != client->warn_task)
1504   {
1505     GNUNET_SCHEDULER_cancel (client->warn_task);
1506     client->warn_task = GNUNET_SCHEDULER_NO_TASK;
1507   }
1508   if (GNUNET_YES == client->in_process_client_buffer)
1509   {
1510     LOG (GNUNET_ERROR_TYPE_DEBUG,
1511          "GNUNET_SERVER_receive_done called while still in processing loop\n");
1512     return;
1513   }
1514   if ((NULL == client->server) || (GNUNET_YES == client->shutdown_now))
1515   {
1516     GNUNET_SERVER_client_disconnect (client);
1517     return;
1518   }
1519   LOG (GNUNET_ERROR_TYPE_DEBUG,
1520        "GNUNET_SERVER_receive_done causes restart in reading from the socket\n");
1521   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == client->restart_task);
1522   client->restart_task = GNUNET_SCHEDULER_add_now (&restart_processing, client);
1523 }
1524
1525
1526 /* end of server.c */