removing invalid assertions that shutdown must preceed closing
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_crypto_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_IOWriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_IOReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * The state of the protocol associated with this socket
285    */
286   enum State state;
287
288   /**
289    * The status of the socket
290    */
291   enum GNUNET_STREAM_Status status;
292
293   /**
294    * The number of previous timeouts; FIXME: currently not used
295    */
296   unsigned int retries;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * Is receive closed
305    */
306   int receive_closed;
307
308   /**
309    * Is transmission closed
310    */
311   int transmit_closed;
312
313   /**
314    * The application port number (type: uint32_t)
315    */
316   GNUNET_MESH_ApplicationType app_port;
317
318   /**
319    * The write sequence number to be set incase of testing
320    */
321   uint32_t testing_set_write_sequence_number_value;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363
364   /**
365    * The maximum size of the data message payload this stream handle can send
366    */
367   uint16_t max_payload_size;
368 };
369
370
371 /**
372  * A socket for listening
373  */
374 struct GNUNET_STREAM_ListenSocket
375 {
376   /**
377    * The mesh handle
378    */
379   struct GNUNET_MESH_Handle *mesh;
380
381   /**
382    * Handle to statistics
383    */
384   struct GNUNET_STATISTICS_Handle *stat_handle;
385
386   /**
387    * Our configuration
388    */
389   struct GNUNET_CONFIGURATION_Handle *cfg;
390
391   /**
392    * Handle to the lock manager service
393    */
394   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
395
396   /**
397    * The active LockingRequest from lockmanager
398    */
399   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
400
401   /**
402    * Callback to call after acquring a lock and listening
403    */
404   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
405
406   /**
407    * The callback function which is called after successful opening socket
408    */
409   GNUNET_STREAM_ListenCallback listen_cb;
410
411   /**
412    * The call back closure
413    */
414   void *listen_cb_cls;
415
416   /**
417    * The service port
418    */
419   GNUNET_MESH_ApplicationType port;
420   
421   /**
422    * The id of the lockmanager timeout task
423    */
424   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
425
426   /**
427    * The retransmit timeout
428    */
429   struct GNUNET_TIME_Relative retransmit_timeout;
430   
431   /**
432    * Listen enabled?
433    */
434   int listening;
435
436   /**
437    * Whether testing mode is active or not
438    */
439   int testing_active;
440
441   /**
442    * The write sequence number to be set incase of testing
443    */
444   uint32_t testing_set_write_sequence_number_value;
445
446   /**
447    * The maximum size of the data message payload this stream handle can send
448    */
449   uint16_t max_payload_size;
450
451 };
452
453
454 /**
455  * The IO Write Handle
456  */
457 struct GNUNET_STREAM_IOWriteHandle
458 {
459   /**
460    * The socket to which this write handle is associated
461    */
462   struct GNUNET_STREAM_Socket *socket;
463
464   /**
465    * The packet_buffers associated with this Handle
466    */
467   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
468
469   /**
470    * The write continuation callback
471    */
472   GNUNET_STREAM_CompletionContinuation write_cont;
473
474   /**
475    * Write continuation closure
476    */
477   void *write_cont_cls;
478
479   /**
480    * The bitmap of this IOHandle; Corresponding bit for a message is set when
481    * it has been acknowledged by the receiver
482    */
483   GNUNET_STREAM_AckBitmap ack_bitmap;
484
485   /**
486    * Number of bytes in this write handle
487    */
488   size_t size;
489
490   /**
491    * Number of packets already transmitted from this IO handle. Retransmitted
492    * packets are not taken into account here. This is used to determine which
493    * packets account for retransmission and which packets occupy buffer space at
494    * the receiver.
495    */
496   unsigned int packets_sent;
497 };
498
499
500 /**
501  * The IO Read Handle
502  */
503 struct GNUNET_STREAM_IOReadHandle
504 {
505   /**
506    * The socket to which this read handle is associated
507    */
508   struct GNUNET_STREAM_Socket *socket;
509   
510   /**
511    * Callback for the read processor
512    */
513   GNUNET_STREAM_DataProcessor proc;
514
515   /**
516    * The closure pointer for the read processor callback
517    */
518   void *proc_cls;
519
520   /**
521    * Task identifier for the read io timeout task
522    */
523   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
524
525   /**
526    * Task scheduled to continue a read operation.
527    */
528   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
529 };
530
531
532 /**
533  * Handle for Shutdown
534  */
535 struct GNUNET_STREAM_ShutdownHandle
536 {
537   /**
538    * The socket associated with this shutdown handle
539    */
540   struct GNUNET_STREAM_Socket *socket;
541
542   /**
543    * Shutdown completion callback
544    */
545   GNUNET_STREAM_ShutdownCompletion completion_cb;
546
547   /**
548    * Closure for completion callback
549    */
550   void *completion_cls;
551
552   /**
553    * Close message retransmission task id
554    */
555   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
556
557   /**
558    * Task scheduled to call the shutdown continuation callback
559    */
560   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
561
562   /**
563    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
564    */
565   int operation;  
566 };
567
568
569 /**
570  * Default value in seconds for various timeouts
571  */
572 static const unsigned int default_timeout = 10;
573
574 /**
575  * The domain name for locks we use here
576  */
577 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
578
579
580 /**
581  * Callback function for sending queued message
582  *
583  * @param cls closure the socket
584  * @param size number of bytes available in buf
585  * @param buf where the callee should write the message
586  * @return number of bytes written to buf
587  */
588 static size_t
589 send_message_notify (void *cls, size_t size, void *buf)
590 {
591   struct GNUNET_STREAM_Socket *socket = cls;
592   struct MessageQueue *head;
593   size_t ret;
594
595   socket->transmit_handle = NULL; /* Remove the transmit handle */
596   head = socket->queue_head;
597   if (NULL == head)
598     return 0; /* just to be safe */
599   if (0 == size)                /* request timed out */
600   {
601     socket->retries++;
602     LOG (GNUNET_ERROR_TYPE_DEBUG,
603          "%s: Message sending timed out. Retry %d \n",
604          GNUNET_i2s (&socket->other_peer),
605          socket->retries);
606     socket->transmit_handle = 
607       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
608                                          GNUNET_NO, /* Corking */
609                                          /* FIXME: exponential backoff */
610                                          socket->retransmit_timeout,
611                                          &socket->other_peer,
612                                          ntohs (head->message->header.size),
613                                          &send_message_notify,
614                                          socket);
615     return 0;
616   }
617   ret = ntohs (head->message->header.size);
618   GNUNET_assert (size >= ret);
619   memcpy (buf, head->message, ret);
620   if (NULL != head->finish_cb)
621   {
622     head->finish_cb (head->finish_cb_cls, socket);
623   }
624   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
625                                socket->queue_tail,
626                                head);
627   GNUNET_free (head->message);
628   GNUNET_free (head);
629   head = socket->queue_head;
630   if (NULL != head)    /* more pending messages to send */
631   {
632     socket->retries = 0;
633     socket->transmit_handle = 
634       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
635                                          GNUNET_NO, /* Corking */
636                                          /* FIXME: exponential backoff */
637                                          socket->retransmit_timeout,
638                                          &socket->other_peer,
639                                          ntohs (head->message->header.size),
640                                          &send_message_notify,
641                                          socket);
642   }
643   return ret;
644 }
645
646
647 /**
648  * Queues a message for sending using the mesh connection of a socket
649  *
650  * @param socket the socket whose mesh connection is used
651  * @param message the message to be sent
652  * @param finish_cb the callback to be called when the message is sent
653  * @param finish_cb_cls the closure for the callback
654  * @param urgent set to GNUNET_YES to add the message to the beginning of the
655  *          queue; GNUNET_NO to add at the tail
656  */
657 static void
658 queue_message (struct GNUNET_STREAM_Socket *socket,
659                struct GNUNET_STREAM_MessageHeader *message,
660                SendFinishCallback finish_cb,
661                void *finish_cb_cls,
662                int urgent)
663 {
664   struct MessageQueue *queue_entity;
665
666   GNUNET_assert 
667     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
668      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
669   LOG (GNUNET_ERROR_TYPE_DEBUG,
670        "%s: Queueing message of type %d and size %d\n",
671        GNUNET_i2s (&socket->other_peer),
672        ntohs (message->header.type),
673        ntohs (message->header.size));
674   GNUNET_assert (NULL != message);
675   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
676   queue_entity->message = message;
677   queue_entity->finish_cb = finish_cb;
678   queue_entity->finish_cb_cls = finish_cb_cls;
679   if (GNUNET_YES == urgent)
680   {
681     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
682                                  queue_entity);
683     if (NULL != socket->transmit_handle)
684     {
685       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
686       socket->transmit_handle = NULL;
687     }
688   }
689   else
690     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
691                                       socket->queue_tail,
692                                       queue_entity);
693   if (NULL == socket->transmit_handle)
694   {
695     socket->retries = 0;
696     socket->transmit_handle = 
697         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
698                                            GNUNET_NO, /* Corking */
699                                            socket->retransmit_timeout,
700                                            &socket->other_peer,
701                                            ntohs (message->header.size),
702                                            &send_message_notify,
703                                            socket);
704   }
705 }
706
707
708 /**
709  * Copies a message and queues it for sending using the mesh connection of
710  * given socket 
711  *
712  * @param socket the socket whose mesh connection is used
713  * @param message the message to be sent
714  * @param finish_cb the callback to be called when the message is sent
715  * @param finish_cb_cls the closure for the callback
716  */
717 static void
718 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
719                         const struct GNUNET_STREAM_MessageHeader *message,
720                         SendFinishCallback finish_cb,
721                         void *finish_cb_cls)
722 {
723   struct GNUNET_STREAM_MessageHeader *msg_copy;
724   uint16_t size;
725   
726   size = ntohs (message->header.size);
727   msg_copy = GNUNET_malloc (size);
728   memcpy (msg_copy, message, size);
729   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
730 }
731
732
733 /**
734  * Writes data using the given socket. The amount of data written is limited by
735  * the receiver_window_size
736  *
737  * @param socket the socket to use
738  */
739 static void 
740 write_data (struct GNUNET_STREAM_Socket *socket);
741
742
743 /**
744  * Task for retransmitting data messages if they aren't ACK before their ack
745  * deadline 
746  *
747  * @param cls the socket
748  * @param tc the Task context
749  */
750 static void
751 data_retransmission_task (void *cls,
752                           const struct GNUNET_SCHEDULER_TaskContext *tc)
753 {
754   struct GNUNET_STREAM_Socket *socket = cls;
755   
756   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
757   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
758     return;
759   LOG (GNUNET_ERROR_TYPE_DEBUG,
760        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
761   write_data (socket);
762 }
763
764
765 /**
766  * Task for sending ACK message
767  *
768  * @param cls the socket
769  * @param tc the Task context
770  */
771 static void
772 ack_task (void *cls,
773           const struct GNUNET_SCHEDULER_TaskContext *tc)
774 {
775   struct GNUNET_STREAM_Socket *socket = cls;
776   struct GNUNET_STREAM_AckMessage *ack_msg;
777
778   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
779   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
780     return;
781   /* Create the ACK Message */
782   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
783   ack_msg->header.header.size = htons (sizeof (struct 
784                                                GNUNET_STREAM_AckMessage));
785   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
786   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
787   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
788   ack_msg->receive_window_remaining = 
789     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
790   /* Queue up ACK for immediate sending */
791   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
792 }
793
794
795 /**
796  * Retransmission task for shutdown messages
797  *
798  * @param cls the shutdown handle
799  * @param tc the Task Context
800  */
801 static void
802 close_msg_retransmission_task (void *cls,
803                                const struct GNUNET_SCHEDULER_TaskContext *tc)
804 {
805   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
806   struct GNUNET_STREAM_MessageHeader *msg;
807   struct GNUNET_STREAM_Socket *socket;
808
809   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
810   GNUNET_assert (NULL != shutdown_handle);
811   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
812     return;
813   socket = shutdown_handle->socket;
814   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
815   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
816   switch (shutdown_handle->operation)
817   {
818   case SHUT_RDWR:
819     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
820     break;
821   case SHUT_RD:
822     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
823     break;
824   case SHUT_WR:
825     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
826     break;
827   default:
828     GNUNET_free (msg);
829     shutdown_handle->close_msg_retransmission_task_id = 
830       GNUNET_SCHEDULER_NO_TASK;
831     return;
832   }
833   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
834   shutdown_handle->close_msg_retransmission_task_id =
835     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
836                                   &close_msg_retransmission_task,
837                                   shutdown_handle);
838 }
839
840
841 /**
842  * Function to modify a bit in GNUNET_STREAM_AckBitmap
843  *
844  * @param bitmap the bitmap to modify
845  * @param bit the bit number to modify
846  * @param value GNUNET_YES to on, GNUNET_NO to off
847  */
848 static void
849 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
850                       unsigned int bit, 
851                       int value)
852 {
853   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
854   if (GNUNET_YES == value)
855     *bitmap |= (1LL << bit);
856   else
857     *bitmap &= ~(1LL << bit);
858 }
859
860
861 /**
862  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
863  *
864  * @param bitmap address of the bitmap that has to be checked
865  * @param bit the bit number to check
866  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
867  */
868 static uint8_t
869 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
870                       unsigned int bit)
871 {
872   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
873   return 0 != (*bitmap & (1LL << bit));
874 }
875
876
877 /**
878  * Writes data using the given socket. The amount of data written is limited by
879  * the receiver_window_size
880  *
881  * @param socket the socket to use
882  */
883 static void 
884 write_data (struct GNUNET_STREAM_Socket *socket)
885 {
886   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
887   unsigned int packet;
888   
889   for (packet=0; packet < io_handle->packets_sent; packet++)
890   {
891     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
892                                            packet))
893     {
894       LOG (GNUNET_ERROR_TYPE_DEBUG,
895            "%s: Retransmitting DATA message with sequence %u\n",
896            GNUNET_i2s (&socket->other_peer),
897            ntohl (io_handle->messages[packet]->sequence_number));
898       copy_and_queue_message (socket,
899                               &io_handle->messages[packet]->header,
900                               NULL,
901                               NULL);
902     }
903   }
904   /* Now send new packets if there is enough buffer space */
905   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
906          (NULL != io_handle->messages[packet]) &&
907          (socket->receiver_window_available 
908           >= ntohs (io_handle->messages[packet]->header.header.size)))
909   {
910     socket->receiver_window_available -= 
911       ntohs (io_handle->messages[packet]->header.header.size);
912     LOG (GNUNET_ERROR_TYPE_DEBUG,
913          "%s: Placing DATA message with sequence %u in send queue\n",
914          GNUNET_i2s (&socket->other_peer),
915          ntohl (io_handle->messages[packet]->sequence_number));
916     copy_and_queue_message (socket,
917                             &io_handle->messages[packet]->header,
918                             NULL,
919                             NULL);
920     packet++;
921   }
922   io_handle->packets_sent = packet;
923   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
924   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
925     socket->data_retransmission_task_id = 
926       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
927                                     (GNUNET_TIME_UNIT_SECONDS, 8),
928                                     &data_retransmission_task,
929                                     socket);
930 }
931
932
933 /**
934  * Task for calling the read processor
935  *
936  * @param cls the socket
937  * @param tc the task context
938  */
939 static void
940 call_read_processor (void *cls,
941                      const struct GNUNET_SCHEDULER_TaskContext *tc)
942 {
943   struct GNUNET_STREAM_Socket *socket = cls;
944   struct GNUNET_STREAM_IOReadHandle *read_handle;
945   size_t read_size;
946   size_t valid_read_size;
947   unsigned int packet;
948   uint32_t sequence_increase;
949   uint32_t offset_increase;
950
951   read_handle = socket->read_handle;
952   GNUNET_assert (NULL != read_handle);
953   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
954   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
955     return;
956   if (NULL == socket->receive_buffer) 
957     return;
958   GNUNET_assert (NULL != socket->read_handle);
959   GNUNET_assert (NULL != socket->read_handle->proc);
960   /* Check the bitmap for any holes */
961   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
962   {
963     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
964                                            packet))
965       break;
966   }
967   /* We only call read processor if we have the first packet */
968   GNUNET_assert (0 < packet);
969   valid_read_size = 
970     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
971   GNUNET_assert (0 != valid_read_size);
972   /* Cancel the read_io_timeout_task */
973   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
974   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
975   /* Call the data processor */
976   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
977        GNUNET_i2s (&socket->other_peer));
978   read_size = 
979       socket->read_handle->proc (socket->read_handle->proc_cls,
980                                  socket->status,
981                                  socket->receive_buffer + socket->copy_offset,
982                                  valid_read_size);
983   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
984        GNUNET_i2s (&socket->other_peer), read_size);
985   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
986        GNUNET_i2s (&socket->other_peer));
987   /* Free the read handle */
988   GNUNET_free (socket->read_handle);
989   socket->read_handle = NULL;
990   GNUNET_assert (read_size <= valid_read_size);
991   socket->copy_offset += read_size;
992   /* Determine upto which packet we can remove from the buffer */
993   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
994   {
995     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
996     { 
997       packet++; 
998       break;
999     }
1000     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1001       break;
1002   }
1003   /* If no packets can be removed we can't move the buffer */
1004   if (0 == packet) 
1005     return;
1006   sequence_increase = packet;
1007   LOG (GNUNET_ERROR_TYPE_DEBUG,
1008        "%s: Sequence increase after read processor completion: %u\n",
1009        GNUNET_i2s (&socket->other_peer), sequence_increase);
1010   /* Shift the data in the receive buffer */
1011   socket->receive_buffer = 
1012     memmove (socket->receive_buffer,
1013              socket->receive_buffer 
1014              + socket->receive_buffer_boundaries[sequence_increase-1],
1015              socket->receive_buffer_size
1016              - socket->receive_buffer_boundaries[sequence_increase-1]);
1017   /* Shift the bitmap */
1018   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1019   /* Set read_sequence_number */
1020   socket->read_sequence_number += sequence_increase;
1021   /* Set read_offset */
1022   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1023   socket->read_offset += offset_increase;
1024   /* Fix copy_offset */
1025   GNUNET_assert (offset_increase <= socket->copy_offset);
1026   socket->copy_offset -= offset_increase;
1027   /* Fix relative boundaries */
1028   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1029   {
1030     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1031     {
1032       uint32_t ahead_buffer_boundary;
1033
1034       ahead_buffer_boundary = 
1035         socket->receive_buffer_boundaries[packet + sequence_increase];
1036       if (0 == ahead_buffer_boundary)
1037         socket->receive_buffer_boundaries[packet] = 0;
1038       else
1039       {
1040         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1041         socket->receive_buffer_boundaries[packet] = 
1042           ahead_buffer_boundary - offset_increase;
1043       }
1044     }
1045     else
1046       socket->receive_buffer_boundaries[packet] = 0;
1047   }
1048 }
1049
1050
1051 /**
1052  * Cancels the existing read io handle
1053  *
1054  * @param cls the closure from the SCHEDULER call
1055  * @param tc the task context
1056  */
1057 static void
1058 read_io_timeout (void *cls,
1059                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1060 {
1061   struct GNUNET_STREAM_Socket *socket = cls;
1062   struct GNUNET_STREAM_IOReadHandle *read_handle;
1063   GNUNET_STREAM_DataProcessor proc;
1064   void *proc_cls;
1065
1066   read_handle = socket->read_handle;
1067   GNUNET_assert (NULL != read_handle);
1068   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1069   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1070     return;
1071   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1072   {
1073     LOG (GNUNET_ERROR_TYPE_DEBUG,
1074          "%s: Read task timedout - Cancelling it\n",
1075          GNUNET_i2s (&socket->other_peer));
1076     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1077     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1078   }
1079   proc = read_handle->proc;
1080   proc_cls = read_handle->proc_cls;
1081   GNUNET_free (read_handle);
1082   socket->read_handle = NULL;
1083   /* Call the read processor to signal timeout */
1084   proc (proc_cls,
1085         GNUNET_STREAM_TIMEOUT,
1086         NULL,
1087         0);
1088 }
1089
1090
1091 /**
1092  * Handler for DATA messages; Same for both client and server
1093  *
1094  * @param socket the socket through which the ack was received
1095  * @param tunnel connection to the other end
1096  * @param sender who sent the message
1097  * @param msg the data message
1098  * @param atsi performance data for the connection
1099  * @return GNUNET_OK to keep the connection open,
1100  *         GNUNET_SYSERR to close it (signal serious error)
1101  */
1102 static int
1103 handle_data (struct GNUNET_STREAM_Socket *socket,
1104              struct GNUNET_MESH_Tunnel *tunnel,
1105              const struct GNUNET_PeerIdentity *sender,
1106              const struct GNUNET_STREAM_DataMessage *msg,
1107              const struct GNUNET_ATS_Information*atsi)
1108 {
1109   const void *payload;
1110   struct GNUNET_TIME_Relative ack_deadline_rel;
1111   uint32_t bytes_needed;
1112   uint32_t relative_offset;
1113   uint32_t relative_sequence_number;
1114   uint16_t size;
1115
1116   size = htons (msg->header.header.size);
1117   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1118   {
1119     GNUNET_break_op (0);
1120     return GNUNET_SYSERR;
1121   }
1122   if (0 != memcmp (sender, &socket->other_peer,
1123                    sizeof (struct GNUNET_PeerIdentity)))
1124   {
1125     LOG (GNUNET_ERROR_TYPE_DEBUG,
1126          "%s: Received DATA from non-confirming peer\n",
1127          GNUNET_i2s (&socket->other_peer));
1128     return GNUNET_YES;
1129   }
1130   switch (socket->state)
1131   {
1132   case STATE_ESTABLISHED:
1133   case STATE_TRANSMIT_CLOSED:
1134   case STATE_TRANSMIT_CLOSE_WAIT:      
1135     /* check if the message's sequence number is in the range we are
1136        expecting */
1137     relative_sequence_number = 
1138       ntohl (msg->sequence_number) - socket->read_sequence_number;
1139     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1140     {
1141       LOG (GNUNET_ERROR_TYPE_DEBUG,
1142            "%s: Ignoring received message with sequence number %u\n",
1143            GNUNET_i2s (&socket->other_peer),
1144            ntohl (msg->sequence_number));
1145       /* Start ACK sending task if one is not already present */
1146       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1147       {
1148         socket->ack_task_id = 
1149           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1150                                         (msg->ack_deadline),
1151                                         &ack_task,
1152                                         socket);
1153       }
1154       return GNUNET_YES;
1155     }      
1156     /* Check if we have already seen this message */
1157     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1158                                             relative_sequence_number))
1159     {
1160       LOG (GNUNET_ERROR_TYPE_DEBUG,
1161            "%s: Ignoring already received message with sequence number %u\n",
1162            GNUNET_i2s (&socket->other_peer),
1163            ntohl (msg->sequence_number));
1164       /* Start ACK sending task if one is not already present */
1165       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1166       {
1167         socket->ack_task_id = 
1168           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1169                                         (msg->ack_deadline), &ack_task, socket);
1170       }
1171       return GNUNET_YES;
1172     }
1173     LOG (GNUNET_ERROR_TYPE_DEBUG,
1174          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1175          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1176          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1177     /* Check if we have to allocate the buffer */
1178     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1179     relative_offset = ntohl (msg->offset) - socket->read_offset;
1180     bytes_needed = relative_offset + size;
1181     if (bytes_needed > socket->receive_buffer_size)
1182     {
1183       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1184       {
1185         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1186                                                  bytes_needed);
1187         socket->receive_buffer_size = bytes_needed;
1188       }
1189       else
1190       {
1191         LOG (GNUNET_ERROR_TYPE_DEBUG,
1192              "%s: Cannot accommodate packet %d as buffer is full\n",
1193              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1194         return GNUNET_YES;
1195       }
1196     }
1197     /* Copy Data to buffer */
1198     payload = &msg[1];
1199     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1200     memcpy (socket->receive_buffer + relative_offset, payload, size);
1201     socket->receive_buffer_boundaries[relative_sequence_number] = 
1202         relative_offset + size;
1203     /* Modify the ACK bitmap */
1204     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1205                           GNUNET_YES);
1206     /* Start ACK sending task if one is not already present */
1207     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1208     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1209     {
1210       ack_deadline_rel = 
1211           GNUNET_TIME_relative_min (ack_deadline_rel,
1212                                     GNUNET_TIME_relative_multiply
1213                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1214       socket->ack_task_id = 
1215           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1216                                         (msg->ack_deadline), &ack_task, socket);
1217       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1218       socket->ack_time_deadline = ack_deadline_rel;
1219     }
1220     else
1221     {
1222       struct GNUNET_TIME_Relative ack_time_past;
1223       struct GNUNET_TIME_Relative ack_time_remaining;
1224       struct GNUNET_TIME_Relative ack_time_min;
1225       ack_time_past = 
1226           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1227       ack_time_remaining = GNUNET_TIME_relative_subtract
1228           (socket->ack_time_deadline, ack_time_past);
1229       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1230                                                ack_deadline_rel);
1231       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1232                       sizeof (struct GNUNET_TIME_Relative)))
1233       {
1234         ack_deadline_rel = ack_time_min;
1235         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1236         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1237                                                             &ack_task, socket);
1238         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1239         socket->ack_time_deadline = ack_deadline_rel;
1240       }
1241     }
1242     if ((NULL != socket->read_handle) /* A read handle is waiting */
1243         /* There is no current read task */
1244         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1245         /* We have the first packet */
1246         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1247     {
1248       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1249            GNUNET_i2s (&socket->other_peer));
1250       socket->read_handle->read_task_id =
1251           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1252     }
1253     break;
1254   default:
1255     LOG (GNUNET_ERROR_TYPE_DEBUG,
1256          "%s: Received data message when it cannot be handled\n",
1257          GNUNET_i2s (&socket->other_peer));
1258     break;
1259   }
1260   return GNUNET_YES;
1261 }
1262
1263
1264 /**
1265  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1266  *
1267  * @param cls the socket (set from GNUNET_MESH_connect)
1268  * @param tunnel connection to the other end
1269  * @param tunnel_ctx place to store local state associated with the tunnel
1270  * @param sender who sent the message
1271  * @param message the actual message
1272  * @param atsi performance data for the connection
1273  * @return GNUNET_OK to keep the connection open,
1274  *         GNUNET_SYSERR to close it (signal serious error)
1275  */
1276 static int
1277 client_handle_data (void *cls,
1278                     struct GNUNET_MESH_Tunnel *tunnel,
1279                     void **tunnel_ctx,
1280                     const struct GNUNET_PeerIdentity *sender,
1281                     const struct GNUNET_MessageHeader *message,
1282                     const struct GNUNET_ATS_Information*atsi)
1283 {
1284   struct GNUNET_STREAM_Socket *socket = cls;
1285
1286   return handle_data (socket, tunnel, sender, 
1287                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1288 }
1289
1290
1291 /**
1292  * Callback to set state to ESTABLISHED
1293  *
1294  * @param cls the closure NULL;
1295  * @param socket the socket to requiring state change
1296  */
1297 static void
1298 set_state_established (void *cls,
1299                        struct GNUNET_STREAM_Socket *socket)
1300 {
1301   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1302        "%s: Attaining ESTABLISHED state\n",
1303        GNUNET_i2s (&socket->other_peer));
1304   socket->write_offset = 0;
1305   socket->read_offset = 0;
1306   socket->state = STATE_ESTABLISHED;
1307   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1308                  socket->control_retransmission_task_id);
1309   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1310   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1311   if (NULL != socket->lsocket)
1312   {
1313     LOG (GNUNET_ERROR_TYPE_DEBUG,
1314          "%s: Calling listen callback\n",
1315          GNUNET_i2s (&socket->other_peer));
1316     if (GNUNET_SYSERR == 
1317         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1318                                     socket,
1319                                     &socket->other_peer))
1320     {
1321       socket->state = STATE_CLOSED;
1322       /* FIXME: We should close in a decent way (send RST) */
1323       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1324       GNUNET_free (socket);
1325     }
1326   }
1327   else
1328     socket->open_cb (socket->open_cls, socket);
1329 }
1330
1331
1332 /**
1333  * Callback to set state to HELLO_WAIT
1334  *
1335  * @param cls the closure from queue_message
1336  * @param socket the socket to requiring state change
1337  */
1338 static void
1339 set_state_hello_wait (void *cls,
1340                       struct GNUNET_STREAM_Socket *socket)
1341 {
1342   GNUNET_assert (STATE_INIT == socket->state);
1343   LOG (GNUNET_ERROR_TYPE_DEBUG,
1344        "%s: Attaining HELLO_WAIT state\n",
1345        GNUNET_i2s (&socket->other_peer));
1346   socket->state = STATE_HELLO_WAIT;
1347 }
1348
1349
1350 /**
1351  * Callback to set state to CLOSE_WAIT
1352  *
1353  * @param cls the closure from queue_message
1354  * @param socket the socket requiring state change
1355  */
1356 static void
1357 set_state_close_wait (void *cls,
1358                       struct GNUNET_STREAM_Socket *socket)
1359 {
1360   LOG (GNUNET_ERROR_TYPE_DEBUG,
1361        "%s: Attaing CLOSE_WAIT state\n",
1362        GNUNET_i2s (&socket->other_peer));
1363   socket->state = STATE_CLOSE_WAIT;
1364   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1365   socket->receive_buffer = NULL;
1366   socket->receive_buffer_size = 0;
1367 }
1368
1369
1370 /**
1371  * Callback to set state to RECEIVE_CLOSE_WAIT
1372  *
1373  * @param cls the closure from queue_message
1374  * @param socket the socket requiring state change
1375  */
1376 static void
1377 set_state_receive_close_wait (void *cls,
1378                               struct GNUNET_STREAM_Socket *socket)
1379 {
1380   LOG (GNUNET_ERROR_TYPE_DEBUG,
1381        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1382        GNUNET_i2s (&socket->other_peer));
1383   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1384   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1385   socket->receive_buffer = NULL;
1386   socket->receive_buffer_size = 0;
1387 }
1388
1389
1390 /**
1391  * Callback to set state to TRANSMIT_CLOSE_WAIT
1392  *
1393  * @param cls the closure from queue_message
1394  * @param socket the socket requiring state change
1395  */
1396 static void
1397 set_state_transmit_close_wait (void *cls,
1398                                struct GNUNET_STREAM_Socket *socket)
1399 {
1400   LOG (GNUNET_ERROR_TYPE_DEBUG,
1401        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1402        GNUNET_i2s (&socket->other_peer));
1403   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1404 }
1405
1406
1407 /**
1408  * Callback to set state to CLOSED
1409  *
1410  * @param cls the closure from queue_message
1411  * @param socket the socket requiring state change
1412  */
1413 static void
1414 set_state_closed (void *cls,
1415                   struct GNUNET_STREAM_Socket *socket)
1416 {
1417   socket->state = STATE_CLOSED;
1418 }
1419
1420
1421 /**
1422  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1423  *
1424  * @return the generate hello message
1425  */
1426 static struct GNUNET_STREAM_MessageHeader *
1427 generate_hello (void)
1428 {
1429   struct GNUNET_STREAM_MessageHeader *msg;
1430
1431   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1432   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1433   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1434   return msg;
1435 }
1436
1437
1438 /**
1439  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1440  * socket
1441  *
1442  * @param socket the socket for which this HelloAckMessage has to be generated
1443  * @param generate_seq GNUNET_YES to generate the write sequence number,
1444  *          GNUNET_NO to use the existing sequence number
1445  * @return the HelloAckMessage
1446  */
1447 static struct GNUNET_STREAM_HelloAckMessage *
1448 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1449                     int generate_seq)
1450 {
1451   struct GNUNET_STREAM_HelloAckMessage *msg;
1452
1453   if (GNUNET_YES == generate_seq)
1454   {
1455     if (GNUNET_YES == socket->testing_active)
1456       socket->write_sequence_number =
1457         socket->testing_set_write_sequence_number_value;
1458     else
1459       socket->write_sequence_number = 
1460         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1461     LOG_DEBUG ("%s: write sequence number %u\n",
1462                GNUNET_i2s (&socket->other_peer),
1463                (unsigned int) socket->write_sequence_number);
1464   }
1465   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1466   msg->header.header.size = 
1467     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1468   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1469   msg->sequence_number = htonl (socket->write_sequence_number);
1470   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1471   return msg;
1472 }
1473
1474
1475 /**
1476  * Task for retransmitting control messages if they aren't ACK'ed before a
1477  * deadline
1478  *
1479  * @param cls the socket
1480  * @param tc the Task context
1481  */
1482 static void
1483 control_retransmission_task (void *cls,
1484                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1485 {
1486   struct GNUNET_STREAM_Socket *socket = cls;
1487     
1488   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1489   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1490     return;
1491   LOG_DEBUG ("%s: Retransmitting a control message\n",
1492                  GNUNET_i2s (&socket->other_peer));
1493   switch (socket->state)
1494   {
1495   case STATE_INIT:    
1496     GNUNET_break (0);
1497     break;
1498   case STATE_LISTEN:
1499     GNUNET_break (0);
1500     break;
1501   case STATE_HELLO_WAIT:
1502     if (NULL == socket->lsocket) /* We are client */
1503       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1504     else
1505       queue_message (socket,
1506                      (struct GNUNET_STREAM_MessageHeader *)
1507                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1508                      GNUNET_NO);
1509     socket->control_retransmission_task_id =
1510     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1511                                   &control_retransmission_task, socket);
1512     break;
1513   case STATE_ESTABLISHED:
1514     if (NULL == socket->lsocket)
1515       queue_message (socket,
1516                      (struct GNUNET_STREAM_MessageHeader *)
1517                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1518                      GNUNET_NO);
1519     else
1520       GNUNET_break (0);
1521     break;
1522   default:
1523     GNUNET_break (0);
1524   }  
1525 }
1526
1527
1528 /**
1529  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1530  *
1531  * @param cls the socket (set from GNUNET_MESH_connect)
1532  * @param tunnel connection to the other end
1533  * @param tunnel_ctx this is NULL
1534  * @param sender who sent the message
1535  * @param message the actual message
1536  * @param atsi performance data for the connection
1537  * @return GNUNET_OK to keep the connection open,
1538  *         GNUNET_SYSERR to close it (signal serious error)
1539  */
1540 static int
1541 client_handle_hello_ack (void *cls,
1542                          struct GNUNET_MESH_Tunnel *tunnel,
1543                          void **tunnel_ctx,
1544                          const struct GNUNET_PeerIdentity *sender,
1545                          const struct GNUNET_MessageHeader *message,
1546                          const struct GNUNET_ATS_Information*atsi)
1547 {
1548   struct GNUNET_STREAM_Socket *socket = cls;
1549   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1550   struct GNUNET_STREAM_HelloAckMessage *reply;
1551
1552   if (0 != memcmp (sender, &socket->other_peer,
1553                    sizeof (struct GNUNET_PeerIdentity)))
1554   {
1555     LOG (GNUNET_ERROR_TYPE_DEBUG,
1556          "%s: Received HELLO_ACK from non-confirming peer\n",
1557          GNUNET_i2s (&socket->other_peer));
1558     return GNUNET_YES;
1559   }
1560   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1561   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1562        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1563   GNUNET_assert (socket->tunnel == tunnel);
1564   switch (socket->state)
1565   {
1566   case STATE_HELLO_WAIT:
1567     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1568     LOG (GNUNET_ERROR_TYPE_DEBUG,
1569          "%s: Read sequence number %u\n",
1570          GNUNET_i2s (&socket->other_peer),
1571          (unsigned int) socket->read_sequence_number);
1572     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1573     reply = generate_hello_ack (socket, GNUNET_YES);
1574     queue_message (socket, &reply->header, &set_state_established,
1575                    NULL, GNUNET_NO);    
1576     return GNUNET_OK;
1577   case STATE_ESTABLISHED:
1578     // call statistics (# ACKs ignored++)
1579     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1580                    socket->control_retransmission_task_id);
1581     socket->control_retransmission_task_id =
1582       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1583     return GNUNET_OK;
1584   default:
1585     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1586                GNUNET_i2s (&socket->other_peer),
1587                GNUNET_i2s (&socket->other_peer), socket->state);
1588     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1589     return GNUNET_SYSERR;
1590   }
1591 }
1592
1593
1594 /**
1595  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1596  *
1597  * @param cls the socket (set from GNUNET_MESH_connect)
1598  * @param tunnel connection to the other end
1599  * @param tunnel_ctx this is NULL
1600  * @param sender who sent the message
1601  * @param message the actual message
1602  * @param atsi performance data for the connection
1603  * @return GNUNET_OK to keep the connection open,
1604  *         GNUNET_SYSERR to close it (signal serious error)
1605  */
1606 static int
1607 client_handle_reset (void *cls,
1608                      struct GNUNET_MESH_Tunnel *tunnel,
1609                      void **tunnel_ctx,
1610                      const struct GNUNET_PeerIdentity *sender,
1611                      const struct GNUNET_MessageHeader *message,
1612                      const struct GNUNET_ATS_Information*atsi)
1613 {
1614   // struct GNUNET_STREAM_Socket *socket = cls;
1615
1616   return GNUNET_OK;
1617 }
1618
1619
1620 /**
1621  * Common message handler for handling TRANSMIT_CLOSE messages
1622  *
1623  * @param socket the socket through which the ack was received
1624  * @param tunnel connection to the other end
1625  * @param sender who sent the message
1626  * @param msg the transmit close message
1627  * @param atsi performance data for the connection
1628  * @return GNUNET_OK to keep the connection open,
1629  *         GNUNET_SYSERR to close it (signal serious error)
1630  */
1631 static int
1632 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1633                        struct GNUNET_MESH_Tunnel *tunnel,
1634                        const struct GNUNET_PeerIdentity *sender,
1635                        const struct GNUNET_STREAM_MessageHeader *msg,
1636                        const struct GNUNET_ATS_Information*atsi)
1637 {
1638   struct GNUNET_STREAM_MessageHeader *reply;
1639
1640   switch (socket->state)
1641   {
1642   case STATE_INIT:
1643   case STATE_LISTEN:
1644   case STATE_HELLO_WAIT:
1645     LOG (GNUNET_ERROR_TYPE_DEBUG,
1646          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1647          GNUNET_i2s (&socket->other_peer));
1648     return GNUNET_OK;
1649   default:
1650     break;
1651   }
1652   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1653        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1654   socket->receive_closed = GNUNET_YES;
1655   if (GNUNET_YES == socket->transmit_closed)
1656     socket->state = STATE_CLOSED;
1657   else
1658     socket->state = STATE_RECEIVE_CLOSED;
1659   /* Send TRANSMIT_CLOSE_ACK */
1660   reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1661   reply->header.type = 
1662       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1663   reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1664   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1665   return GNUNET_OK;
1666 }
1667
1668
1669 /**
1670  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1671  *
1672  * @param cls the socket (set from GNUNET_MESH_connect)
1673  * @param tunnel connection to the other end
1674  * @param tunnel_ctx this is NULL
1675  * @param sender who sent the message
1676  * @param message the actual message
1677  * @param atsi performance data for the connection
1678  * @return GNUNET_OK to keep the connection open,
1679  *         GNUNET_SYSERR to close it (signal serious error)
1680  */
1681 static int
1682 client_handle_transmit_close (void *cls,
1683                               struct GNUNET_MESH_Tunnel *tunnel,
1684                               void **tunnel_ctx,
1685                               const struct GNUNET_PeerIdentity *sender,
1686                               const struct GNUNET_MessageHeader *message,
1687                               const struct GNUNET_ATS_Information*atsi)
1688 {
1689   struct GNUNET_STREAM_Socket *socket = cls;
1690   
1691   return handle_transmit_close (socket,
1692                                 tunnel,
1693                                 sender,
1694                                 (struct GNUNET_STREAM_MessageHeader *)message,
1695                                 atsi);
1696 }
1697
1698
1699 /**
1700  * Task for calling the shutdown continuation callback
1701  *
1702  * @param cls the socket
1703  * @param tc the scheduler task context
1704  */
1705 static void
1706 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1707 {
1708   struct GNUNET_STREAM_Socket *socket = cls;
1709   
1710   GNUNET_assert (NULL != socket->shutdown_handle);
1711   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1712   if (NULL != socket->shutdown_handle->completion_cb)
1713     socket->shutdown_handle->completion_cb
1714         (socket->shutdown_handle->completion_cls,
1715          socket->shutdown_handle->operation);
1716   GNUNET_free (socket->shutdown_handle);
1717   socket->shutdown_handle = NULL;
1718 }
1719
1720
1721 /**
1722  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1723  *
1724  * @param socket the socket
1725  * @param tunnel connection to the other end
1726  * @param sender who sent the message
1727  * @param message the actual message
1728  * @param atsi performance data for the connection
1729  * @param operation the close operation which is being ACK'ed
1730  * @return GNUNET_OK to keep the connection open,
1731  *         GNUNET_SYSERR to close it (signal serious error)
1732  */
1733 static int
1734 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1735                           struct GNUNET_MESH_Tunnel *tunnel,
1736                           const struct GNUNET_PeerIdentity *sender,
1737                           const struct GNUNET_STREAM_MessageHeader *message,
1738                           const struct GNUNET_ATS_Information *atsi,
1739                           int operation)
1740 {
1741   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1742
1743   shutdown_handle = socket->shutdown_handle;
1744   if (NULL == shutdown_handle)
1745   {
1746     /* This may happen when the shudown handle is cancelled */
1747     LOG (GNUNET_ERROR_TYPE_DEBUG,
1748          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1749          GNUNET_i2s (&socket->other_peer));
1750     return GNUNET_OK;
1751   }
1752   switch (operation)
1753   {
1754   case SHUT_RDWR:
1755     switch (socket->state)
1756     {
1757     case STATE_CLOSE_WAIT:
1758       if (SHUT_RDWR != shutdown_handle->operation)
1759       {
1760         LOG (GNUNET_ERROR_TYPE_DEBUG,
1761              "%s: Received CLOSE_ACK when shutdown handle is not for "
1762              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1763         return GNUNET_OK;
1764       }
1765       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1766            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1767       socket->state = STATE_CLOSED;
1768       break;
1769     default:
1770       LOG (GNUNET_ERROR_TYPE_DEBUG,
1771            "%s: Received CLOSE_ACK when it is not expected\n",
1772            GNUNET_i2s (&socket->other_peer));
1773       return GNUNET_OK;
1774     }
1775     break;
1776   case SHUT_RD:
1777     switch (socket->state)
1778     {
1779     case STATE_RECEIVE_CLOSE_WAIT:
1780       if (SHUT_RD != shutdown_handle->operation)
1781       {
1782         LOG (GNUNET_ERROR_TYPE_DEBUG,
1783              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1784              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1785         return GNUNET_OK;
1786       }
1787       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1788            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1789       socket->state = STATE_RECEIVE_CLOSED;
1790       break;
1791     default:
1792       LOG (GNUNET_ERROR_TYPE_DEBUG,
1793            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1794            GNUNET_i2s (&socket->other_peer));
1795       return GNUNET_OK;
1796     }
1797     break;
1798   case SHUT_WR:
1799     switch (socket->state)
1800     {
1801     case STATE_TRANSMIT_CLOSE_WAIT:
1802       if (SHUT_WR != shutdown_handle->operation)
1803       {
1804         LOG (GNUNET_ERROR_TYPE_DEBUG,
1805              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1806              "is not for SHUT_WR\n",
1807              GNUNET_i2s (&socket->other_peer));
1808         return GNUNET_OK;
1809       }
1810       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1811            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1812       socket->state = STATE_TRANSMIT_CLOSED;
1813       break;
1814     default:
1815       LOG (GNUNET_ERROR_TYPE_DEBUG,
1816            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1817            GNUNET_i2s (&socket->other_peer));          
1818       return GNUNET_OK;
1819     }
1820     break;
1821   default:
1822     GNUNET_assert (0);
1823   }
1824   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1825       (&call_cont_task, socket);
1826   if (GNUNET_SCHEDULER_NO_TASK
1827       != shutdown_handle->close_msg_retransmission_task_id)
1828   {
1829     GNUNET_SCHEDULER_cancel
1830       (shutdown_handle->close_msg_retransmission_task_id);
1831     shutdown_handle->close_msg_retransmission_task_id =
1832         GNUNET_SCHEDULER_NO_TASK;
1833   }
1834   return GNUNET_OK;
1835 }
1836
1837
1838 /**
1839  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1840  *
1841  * @param cls the socket (set from GNUNET_MESH_connect)
1842  * @param tunnel connection to the other end
1843  * @param tunnel_ctx this is NULL
1844  * @param sender who sent the message
1845  * @param message the actual message
1846  * @param atsi performance data for the connection
1847  * @return GNUNET_OK to keep the connection open,
1848  *         GNUNET_SYSERR to close it (signal serious error)
1849  */
1850 static int
1851 client_handle_transmit_close_ack (void *cls,
1852                                   struct GNUNET_MESH_Tunnel *tunnel,
1853                                   void **tunnel_ctx,
1854                                   const struct GNUNET_PeerIdentity *sender,
1855                                   const struct GNUNET_MessageHeader *message,
1856                                   const struct GNUNET_ATS_Information*atsi)
1857 {
1858   struct GNUNET_STREAM_Socket *socket = cls;
1859
1860   return handle_generic_close_ack (socket,
1861                                    tunnel,
1862                                    sender,
1863                                    (const struct GNUNET_STREAM_MessageHeader *)
1864                                    message,
1865                                    atsi,
1866                                    SHUT_WR);
1867 }
1868
1869
1870 /**
1871  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1872  *
1873  * @param socket the socket
1874  * @param tunnel connection to the other end
1875  * @param sender who sent the message
1876  * @param message the actual message
1877  * @param atsi performance data for the connection
1878  * @return GNUNET_OK to keep the connection open,
1879  *         GNUNET_SYSERR to close it (signal serious error)
1880  */
1881 static int
1882 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1883                       struct GNUNET_MESH_Tunnel *tunnel,
1884                       const struct GNUNET_PeerIdentity *sender,
1885                       const struct GNUNET_STREAM_MessageHeader *message,
1886                       const struct GNUNET_ATS_Information *atsi)
1887 {
1888   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1889
1890   switch (socket->state)
1891   {
1892   case STATE_INIT:
1893   case STATE_LISTEN:
1894   case STATE_HELLO_WAIT:
1895     LOG (GNUNET_ERROR_TYPE_DEBUG,
1896          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1897          GNUNET_i2s (&socket->other_peer));
1898     return GNUNET_OK;
1899   default:
1900     break;
1901   }  
1902   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1903        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1904   socket->transmit_closed = GNUNET_YES;
1905   if (GNUNET_YES == socket->receive_closed)
1906     socket->state = STATE_CLOSED;
1907   else
1908     socket->state = STATE_TRANSMIT_CLOSED;
1909   receive_close_ack =
1910     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1911   receive_close_ack->header.size =
1912     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1913   receive_close_ack->header.type =
1914     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1915   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1916   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1917      that that stream has been shutdown */
1918   if (NULL != socket->write_handle)
1919   {
1920     GNUNET_STREAM_CompletionContinuation wc;
1921     void *wc_cls;
1922
1923     wc = socket->write_handle->write_cont;
1924     wc_cls = socket->write_handle->write_cont_cls;
1925     GNUNET_STREAM_io_write_cancel (socket->write_handle);
1926     socket->write_handle = NULL;
1927     if (NULL != wc)
1928       wc (wc_cls,
1929           GNUNET_STREAM_SHUTDOWN, 0);
1930   }
1931   return GNUNET_OK;
1932 }
1933
1934
1935 /**
1936  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1937  *
1938  * @param cls the socket (set from GNUNET_MESH_connect)
1939  * @param tunnel connection to the other end
1940  * @param tunnel_ctx this is NULL
1941  * @param sender who sent the message
1942  * @param message the actual message
1943  * @param atsi performance data for the connection
1944  * @return GNUNET_OK to keep the connection open,
1945  *         GNUNET_SYSERR to close it (signal serious error)
1946  */
1947 static int
1948 client_handle_receive_close (void *cls,
1949                              struct GNUNET_MESH_Tunnel *tunnel,
1950                              void **tunnel_ctx,
1951                              const struct GNUNET_PeerIdentity *sender,
1952                              const struct GNUNET_MessageHeader *message,
1953                              const struct GNUNET_ATS_Information*atsi)
1954 {
1955   struct GNUNET_STREAM_Socket *socket = cls;
1956
1957   return
1958     handle_receive_close (socket,
1959                           tunnel,
1960                           sender,
1961                           (const struct GNUNET_STREAM_MessageHeader *) message,
1962                           atsi);
1963 }
1964
1965
1966 /**
1967  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1968  *
1969  * @param cls the socket (set from GNUNET_MESH_connect)
1970  * @param tunnel connection to the other end
1971  * @param tunnel_ctx this is NULL
1972  * @param sender who sent the message
1973  * @param message the actual message
1974  * @param atsi performance data for the connection
1975  * @return GNUNET_OK to keep the connection open,
1976  *         GNUNET_SYSERR to close it (signal serious error)
1977  */
1978 static int
1979 client_handle_receive_close_ack (void *cls,
1980                                  struct GNUNET_MESH_Tunnel *tunnel,
1981                                  void **tunnel_ctx,
1982                                  const struct GNUNET_PeerIdentity *sender,
1983                                  const struct GNUNET_MessageHeader *message,
1984                                  const struct GNUNET_ATS_Information*atsi)
1985 {
1986   struct GNUNET_STREAM_Socket *socket = cls;
1987
1988   return handle_generic_close_ack (socket,
1989                                    tunnel,
1990                                    sender,
1991                                    (const struct GNUNET_STREAM_MessageHeader *)
1992                                    message,
1993                                    atsi,
1994                                    SHUT_RD);
1995 }
1996
1997
1998 /**
1999  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2000  *
2001  * @param socket the socket
2002  * @param tunnel connection to the other end
2003  * @param sender who sent the message
2004  * @param message the actual message
2005  * @param atsi performance data for the connection
2006  * @return GNUNET_OK to keep the connection open,
2007  *         GNUNET_SYSERR to close it (signal serious error)
2008  */
2009 static int
2010 handle_close (struct GNUNET_STREAM_Socket *socket,
2011               struct GNUNET_MESH_Tunnel *tunnel,
2012               const struct GNUNET_PeerIdentity *sender,
2013               const struct GNUNET_STREAM_MessageHeader *message,
2014               const struct GNUNET_ATS_Information*atsi)
2015 {
2016   struct GNUNET_STREAM_MessageHeader *close_ack;
2017
2018   switch (socket->state)
2019   {
2020   case STATE_INIT:
2021   case STATE_LISTEN:
2022   case STATE_HELLO_WAIT:
2023     LOG (GNUNET_ERROR_TYPE_DEBUG,
2024          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2025          GNUNET_i2s (&socket->other_peer));
2026     return GNUNET_OK;
2027   default:
2028     break;
2029   }
2030   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2031        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2032   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2033   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2034   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2035   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2036   if (STATE_CLOSED == socket->state)
2037     return GNUNET_OK;
2038   socket->receive_closed = GNUNET_YES;
2039   socket->transmit_closed = GNUNET_YES;
2040   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2041   socket->receive_buffer = NULL;
2042   socket->receive_buffer_size = 0;  
2043   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
2044      that that stream has been shutdown */
2045   if (NULL != socket->write_handle)
2046   {
2047     GNUNET_STREAM_CompletionContinuation wc;
2048     void *wc_cls;
2049
2050     wc = socket->write_handle->write_cont;
2051     wc_cls = socket->write_handle->write_cont_cls;
2052     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2053     socket->write_handle = NULL;
2054     if (NULL != wc)
2055       wc (wc_cls,
2056           GNUNET_STREAM_SHUTDOWN, 0);
2057   }
2058   return GNUNET_OK;
2059 }
2060
2061
2062 /**
2063  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2064  *
2065  * @param cls the socket (set from GNUNET_MESH_connect)
2066  * @param tunnel connection to the other end
2067  * @param tunnel_ctx this is NULL
2068  * @param sender who sent the message
2069  * @param message the actual message
2070  * @param atsi performance data for the connection
2071  * @return GNUNET_OK to keep the connection open,
2072  *         GNUNET_SYSERR to close it (signal serious error)
2073  */
2074 static int
2075 client_handle_close (void *cls,
2076                      struct GNUNET_MESH_Tunnel *tunnel,
2077                      void **tunnel_ctx,
2078                      const struct GNUNET_PeerIdentity *sender,
2079                      const struct GNUNET_MessageHeader *message,
2080                      const struct GNUNET_ATS_Information*atsi)
2081 {
2082   struct GNUNET_STREAM_Socket *socket = cls;
2083
2084   return handle_close (socket,
2085                        tunnel,
2086                        sender,
2087                        (const struct GNUNET_STREAM_MessageHeader *) message,
2088                        atsi);
2089 }
2090
2091
2092 /**
2093  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2094  *
2095  * @param cls the socket (set from GNUNET_MESH_connect)
2096  * @param tunnel connection to the other end
2097  * @param tunnel_ctx this is NULL
2098  * @param sender who sent the message
2099  * @param message the actual message
2100  * @param atsi performance data for the connection
2101  * @return GNUNET_OK to keep the connection open,
2102  *         GNUNET_SYSERR to close it (signal serious error)
2103  */
2104 static int
2105 client_handle_close_ack (void *cls,
2106                          struct GNUNET_MESH_Tunnel *tunnel,
2107                          void **tunnel_ctx,
2108                          const struct GNUNET_PeerIdentity *sender,
2109                          const struct GNUNET_MessageHeader *message,
2110                          const struct GNUNET_ATS_Information *atsi)
2111 {
2112   struct GNUNET_STREAM_Socket *socket = cls;
2113
2114   return handle_generic_close_ack (socket,
2115                                    tunnel,
2116                                    sender,
2117                                    (const struct GNUNET_STREAM_MessageHeader *) 
2118                                    message,
2119                                    atsi,
2120                                    SHUT_RDWR);
2121 }
2122
2123 /*****************************/
2124 /* Server's Message Handlers */
2125 /*****************************/
2126
2127 /**
2128  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2129  *
2130  * @param cls the closure
2131  * @param tunnel connection to the other end
2132  * @param tunnel_ctx the socket
2133  * @param sender who sent the message
2134  * @param message the actual message
2135  * @param atsi performance data for the connection
2136  * @return GNUNET_OK to keep the connection open,
2137  *         GNUNET_SYSERR to close it (signal serious error)
2138  */
2139 static int
2140 server_handle_data (void *cls,
2141                     struct GNUNET_MESH_Tunnel *tunnel,
2142                     void **tunnel_ctx,
2143                     const struct GNUNET_PeerIdentity *sender,
2144                     const struct GNUNET_MessageHeader *message,
2145                     const struct GNUNET_ATS_Information*atsi)
2146 {
2147   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2148
2149   return handle_data (socket,
2150                       tunnel,
2151                       sender,
2152                       (const struct GNUNET_STREAM_DataMessage *)message,
2153                       atsi);
2154 }
2155
2156
2157 /**
2158  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2159  *
2160  * @param cls the closure
2161  * @param tunnel connection to the other end
2162  * @param tunnel_ctx the socket
2163  * @param sender who sent the message
2164  * @param message the actual message
2165  * @param atsi performance data for the connection
2166  * @return GNUNET_OK to keep the connection open,
2167  *         GNUNET_SYSERR to close it (signal serious error)
2168  */
2169 static int
2170 server_handle_hello (void *cls,
2171                      struct GNUNET_MESH_Tunnel *tunnel,
2172                      void **tunnel_ctx,
2173                      const struct GNUNET_PeerIdentity *sender,
2174                      const struct GNUNET_MessageHeader *message,
2175                      const struct GNUNET_ATS_Information*atsi)
2176 {
2177   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2178   struct GNUNET_STREAM_HelloAckMessage *reply;
2179
2180   if (0 != memcmp (sender,
2181                    &socket->other_peer,
2182                    sizeof (struct GNUNET_PeerIdentity)))
2183   {
2184     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2185                GNUNET_i2s (&socket->other_peer));
2186     return GNUNET_YES;
2187   }
2188   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2189   GNUNET_assert (socket->tunnel == tunnel);
2190   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2191              GNUNET_i2s (&socket->other_peer));
2192   switch (socket->state)
2193   {
2194   case STATE_INIT:
2195     reply = generate_hello_ack (socket, GNUNET_YES);
2196     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2197                    GNUNET_NO);
2198     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2199                    socket->control_retransmission_task_id);
2200     socket->control_retransmission_task_id =
2201       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2202                                     &control_retransmission_task, socket);
2203     break;
2204   case STATE_HELLO_WAIT:
2205     /* Perhaps our HELLO_ACK was lost */
2206     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2207                    socket->control_retransmission_task_id);
2208     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2209     socket->control_retransmission_task_id =
2210       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2211     break;
2212   default:
2213     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2214                GNUNET_i2s (&socket->other_peer), socket->state);
2215     /* FIXME: Send RESET? */
2216   }
2217   return GNUNET_OK;
2218 }
2219
2220
2221 /**
2222  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2223  *
2224  * @param cls the closure
2225  * @param tunnel connection to the other end
2226  * @param tunnel_ctx the socket
2227  * @param sender who sent the message
2228  * @param message the actual message
2229  * @param atsi performance data for the connection
2230  * @return GNUNET_OK to keep the connection open,
2231  *         GNUNET_SYSERR to close it (signal serious error)
2232  */
2233 static int
2234 server_handle_hello_ack (void *cls,
2235                          struct GNUNET_MESH_Tunnel *tunnel,
2236                          void **tunnel_ctx,
2237                          const struct GNUNET_PeerIdentity *sender,
2238                          const struct GNUNET_MessageHeader *message,
2239                          const struct GNUNET_ATS_Information*atsi)
2240 {
2241   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2242   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2243
2244   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2245                  ntohs (message->type));
2246   GNUNET_assert (socket->tunnel == tunnel);
2247   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2248   switch (socket->state)  
2249   {
2250   case STATE_HELLO_WAIT:
2251     LOG (GNUNET_ERROR_TYPE_DEBUG,
2252          "%s: Received HELLO_ACK from %s\n",
2253          GNUNET_i2s (&socket->other_peer),
2254          GNUNET_i2s (&socket->other_peer));
2255     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2256     LOG (GNUNET_ERROR_TYPE_DEBUG,
2257          "%s: Read sequence number %u\n",
2258          GNUNET_i2s (&socket->other_peer),
2259          (unsigned int) socket->read_sequence_number);
2260     socket->receiver_window_available = 
2261       ntohl (ack_message->receiver_window_size);
2262     set_state_established (NULL, socket);
2263     break;
2264   default:
2265     LOG (GNUNET_ERROR_TYPE_DEBUG,
2266          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2267   }
2268   return GNUNET_OK;
2269 }
2270
2271
2272 /**
2273  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2274  *
2275  * @param cls the closure
2276  * @param tunnel connection to the other end
2277  * @param tunnel_ctx the socket
2278  * @param sender who sent the message
2279  * @param message the actual message
2280  * @param atsi performance data for the connection
2281  * @return GNUNET_OK to keep the connection open,
2282  *         GNUNET_SYSERR to close it (signal serious error)
2283  */
2284 static int
2285 server_handle_reset (void *cls,
2286                      struct GNUNET_MESH_Tunnel *tunnel,
2287                      void **tunnel_ctx,
2288                      const struct GNUNET_PeerIdentity *sender,
2289                      const struct GNUNET_MessageHeader *message,
2290                      const struct GNUNET_ATS_Information*atsi)
2291 {
2292   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2293
2294   return GNUNET_OK;
2295 }
2296
2297
2298 /**
2299  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2300  *
2301  * @param cls the closure
2302  * @param tunnel connection to the other end
2303  * @param tunnel_ctx the socket
2304  * @param sender who sent the message
2305  * @param message the actual message
2306  * @param atsi performance data for the connection
2307  * @return GNUNET_OK to keep the connection open,
2308  *         GNUNET_SYSERR to close it (signal serious error)
2309  */
2310 static int
2311 server_handle_transmit_close (void *cls,
2312                               struct GNUNET_MESH_Tunnel *tunnel,
2313                               void **tunnel_ctx,
2314                               const struct GNUNET_PeerIdentity *sender,
2315                               const struct GNUNET_MessageHeader *message,
2316                               const struct GNUNET_ATS_Information*atsi)
2317 {
2318   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2319
2320   return handle_transmit_close (socket,
2321                                 tunnel,
2322                                 sender,
2323                                 (struct GNUNET_STREAM_MessageHeader *)message,
2324                                 atsi);
2325 }
2326
2327
2328 /**
2329  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2330  *
2331  * @param cls the closure
2332  * @param tunnel connection to the other end
2333  * @param tunnel_ctx the socket
2334  * @param sender who sent the message
2335  * @param message the actual message
2336  * @param atsi performance data for the connection
2337  * @return GNUNET_OK to keep the connection open,
2338  *         GNUNET_SYSERR to close it (signal serious error)
2339  */
2340 static int
2341 server_handle_transmit_close_ack (void *cls,
2342                                   struct GNUNET_MESH_Tunnel *tunnel,
2343                                   void **tunnel_ctx,
2344                                   const struct GNUNET_PeerIdentity *sender,
2345                                   const struct GNUNET_MessageHeader *message,
2346                                   const struct GNUNET_ATS_Information*atsi)
2347 {
2348   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2349
2350   return handle_generic_close_ack (socket,
2351                                    tunnel,
2352                                    sender,
2353                                    (const struct GNUNET_STREAM_MessageHeader *)
2354                                    message,
2355                                    atsi,
2356                                    SHUT_WR);
2357 }
2358
2359
2360 /**
2361  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2362  *
2363  * @param cls the closure
2364  * @param tunnel connection to the other end
2365  * @param tunnel_ctx the socket
2366  * @param sender who sent the message
2367  * @param message the actual message
2368  * @param atsi performance data for the connection
2369  * @return GNUNET_OK to keep the connection open,
2370  *         GNUNET_SYSERR to close it (signal serious error)
2371  */
2372 static int
2373 server_handle_receive_close (void *cls,
2374                              struct GNUNET_MESH_Tunnel *tunnel,
2375                              void **tunnel_ctx,
2376                              const struct GNUNET_PeerIdentity *sender,
2377                              const struct GNUNET_MessageHeader *message,
2378                              const struct GNUNET_ATS_Information*atsi)
2379 {
2380   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2381
2382   return
2383     handle_receive_close (socket,
2384                           tunnel,
2385                           sender,
2386                           (const struct GNUNET_STREAM_MessageHeader *) message,
2387                           atsi);
2388 }
2389
2390
2391 /**
2392  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2393  *
2394  * @param cls the closure
2395  * @param tunnel connection to the other end
2396  * @param tunnel_ctx the socket
2397  * @param sender who sent the message
2398  * @param message the actual message
2399  * @param atsi performance data for the connection
2400  * @return GNUNET_OK to keep the connection open,
2401  *         GNUNET_SYSERR to close it (signal serious error)
2402  */
2403 static int
2404 server_handle_receive_close_ack (void *cls,
2405                                  struct GNUNET_MESH_Tunnel *tunnel,
2406                                  void **tunnel_ctx,
2407                                  const struct GNUNET_PeerIdentity *sender,
2408                                  const struct GNUNET_MessageHeader *message,
2409                                  const struct GNUNET_ATS_Information*atsi)
2410 {
2411   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2412
2413   return handle_generic_close_ack (socket,
2414                                    tunnel,
2415                                    sender,
2416                                    (const struct GNUNET_STREAM_MessageHeader *)
2417                                    message,
2418                                    atsi,
2419                                    SHUT_RD);
2420 }
2421
2422
2423 /**
2424  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2425  *
2426  * @param cls the listen socket (from GNUNET_MESH_connect in
2427  *          GNUNET_STREAM_listen) 
2428  * @param tunnel connection to the other end
2429  * @param tunnel_ctx the socket
2430  * @param sender who sent the message
2431  * @param message the actual message
2432  * @param atsi performance data for the connection
2433  * @return GNUNET_OK to keep the connection open,
2434  *         GNUNET_SYSERR to close it (signal serious error)
2435  */
2436 static int
2437 server_handle_close (void *cls,
2438                      struct GNUNET_MESH_Tunnel *tunnel,
2439                      void **tunnel_ctx,
2440                      const struct GNUNET_PeerIdentity *sender,
2441                      const struct GNUNET_MessageHeader *message,
2442                      const struct GNUNET_ATS_Information*atsi)
2443 {
2444   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2445   
2446   return handle_close (socket,
2447                        tunnel,
2448                        sender,
2449                        (const struct GNUNET_STREAM_MessageHeader *) message,
2450                        atsi);
2451 }
2452
2453
2454 /**
2455  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2456  *
2457  * @param cls the closure
2458  * @param tunnel connection to the other end
2459  * @param tunnel_ctx the socket
2460  * @param sender who sent the message
2461  * @param message the actual message
2462  * @param atsi performance data for the connection
2463  * @return GNUNET_OK to keep the connection open,
2464  *         GNUNET_SYSERR to close it (signal serious error)
2465  */
2466 static int
2467 server_handle_close_ack (void *cls,
2468                          struct GNUNET_MESH_Tunnel *tunnel,
2469                          void **tunnel_ctx,
2470                          const struct GNUNET_PeerIdentity *sender,
2471                          const struct GNUNET_MessageHeader *message,
2472                          const struct GNUNET_ATS_Information*atsi)
2473 {
2474   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2475
2476   return handle_generic_close_ack (socket,
2477                                    tunnel,
2478                                    sender,
2479                                    (const struct GNUNET_STREAM_MessageHeader *) 
2480                                    message,
2481                                    atsi,
2482                                    SHUT_RDWR);
2483 }
2484
2485
2486 /**
2487  * Handler for DATA_ACK messages
2488  *
2489  * @param socket the socket through which the ack was received
2490  * @param tunnel connection to the other end
2491  * @param sender who sent the message
2492  * @param ack the acknowledgment message
2493  * @param atsi performance data for the connection
2494  * @return GNUNET_OK to keep the connection open,
2495  *         GNUNET_SYSERR to close it (signal serious error)
2496  */
2497 static int
2498 handle_ack (struct GNUNET_STREAM_Socket *socket,
2499             struct GNUNET_MESH_Tunnel *tunnel,
2500             const struct GNUNET_PeerIdentity *sender,
2501             const struct GNUNET_STREAM_AckMessage *ack,
2502             const struct GNUNET_ATS_Information*atsi)
2503 {
2504   unsigned int packet;
2505   int need_retransmission;
2506   uint32_t sequence_difference;
2507   
2508   if (0 != memcmp (sender,
2509                    &socket->other_peer,
2510                    sizeof (struct GNUNET_PeerIdentity)))
2511   {
2512     LOG (GNUNET_ERROR_TYPE_DEBUG,
2513          "%s: Received ACK from non-confirming peer\n",
2514          GNUNET_i2s (&socket->other_peer));
2515     return GNUNET_YES;
2516   }
2517   switch (socket->state)
2518   {
2519   case (STATE_ESTABLISHED):
2520   case (STATE_RECEIVE_CLOSED):
2521   case (STATE_RECEIVE_CLOSE_WAIT):
2522     if (NULL == socket->write_handle)
2523     {
2524       LOG (GNUNET_ERROR_TYPE_DEBUG,
2525            "%s: Received DATA_ACK when write_handle is NULL\n",
2526            GNUNET_i2s (&socket->other_peer));
2527       return GNUNET_OK;
2528     }
2529     sequence_difference = 
2530         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2531     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2532     {
2533       LOG (GNUNET_ERROR_TYPE_DEBUG,
2534            "%s: Received DATA_ACK with unexpected base sequence number\n",
2535            GNUNET_i2s (&socket->other_peer));
2536       LOG (GNUNET_ERROR_TYPE_DEBUG,
2537            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2538            GNUNET_i2s (&socket->other_peer),
2539            socket->write_sequence_number,
2540            ntohl (ack->base_sequence_number));
2541       return GNUNET_OK;
2542     }
2543     /* FIXME: include the case when write_handle is cancelled - ignore the 
2544        acks */
2545     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2546          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2547     /* Cancel the retransmission task */
2548     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2549     {
2550       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2551       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2552     }
2553     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2554     {
2555       if (NULL == socket->write_handle->messages[packet]) break;
2556       /* BS: Base sequence from ack; PS: sequence num of current packet */
2557       sequence_difference = ntohl (ack->base_sequence_number)
2558         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2559       if ((0 == sequence_difference) ||
2560           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2561         continue; /* The message in our handle is not yet received */
2562       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2563       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2564       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2565                             packet, GNUNET_YES);
2566     }
2567     /* Update the receive window remaining
2568        FIXME : Should update with the value from a data ack with greater
2569        sequence number */
2570     socket->receiver_window_available = 
2571       ntohl (ack->receive_window_remaining);
2572     /* Check if we have received all acknowledgements */
2573     need_retransmission = GNUNET_NO;
2574     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2575     {
2576       if (NULL == socket->write_handle->messages[packet]) break;
2577       if (GNUNET_YES != ackbitmap_is_bit_set 
2578           (&socket->write_handle->ack_bitmap,packet))
2579       {
2580         need_retransmission = GNUNET_YES;
2581         break;
2582       }
2583     }
2584     if (GNUNET_YES == need_retransmission)
2585     {
2586       write_data (socket);
2587     }
2588     else      /* We have to call the write continuation callback now */
2589     {
2590       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2591       
2592       /* Free the packets */
2593       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2594       {
2595         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2596       }
2597       write_handle = socket->write_handle;
2598       socket->write_handle = NULL;
2599       if (NULL != write_handle->write_cont)
2600         write_handle->write_cont (write_handle->write_cont_cls,
2601                                   socket->status,
2602                                   write_handle->size);
2603       /* We are done with the write handle - Freeing it */
2604       GNUNET_free (write_handle);
2605       LOG (GNUNET_ERROR_TYPE_DEBUG,
2606            "%s: Write completion callback completed\n",
2607            GNUNET_i2s (&socket->other_peer));      
2608     }
2609     break;
2610   default:
2611     break;
2612   }
2613   return GNUNET_OK;
2614 }
2615
2616
2617 /**
2618  * Handler for DATA_ACK messages
2619  *
2620  * @param cls the 'struct GNUNET_STREAM_Socket'
2621  * @param tunnel connection to the other end
2622  * @param tunnel_ctx unused
2623  * @param sender who sent the message
2624  * @param message the actual message
2625  * @param atsi performance data for the connection
2626  * @return GNUNET_OK to keep the connection open,
2627  *         GNUNET_SYSERR to close it (signal serious error)
2628  */
2629 static int
2630 client_handle_ack (void *cls,
2631                    struct GNUNET_MESH_Tunnel *tunnel,
2632                    void **tunnel_ctx,
2633                    const struct GNUNET_PeerIdentity *sender,
2634                    const struct GNUNET_MessageHeader *message,
2635                    const struct GNUNET_ATS_Information*atsi)
2636 {
2637   struct GNUNET_STREAM_Socket *socket = cls;
2638   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2639  
2640   return handle_ack (socket, tunnel, sender, ack, atsi);
2641 }
2642
2643
2644 /**
2645  * Handler for DATA_ACK messages
2646  *
2647  * @param cls the server's listen socket
2648  * @param tunnel connection to the other end
2649  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2650  * @param sender who sent the message
2651  * @param message the actual message
2652  * @param atsi performance data for the connection
2653  * @return GNUNET_OK to keep the connection open,
2654  *         GNUNET_SYSERR to close it (signal serious error)
2655  */
2656 static int
2657 server_handle_ack (void *cls,
2658                    struct GNUNET_MESH_Tunnel *tunnel,
2659                    void **tunnel_ctx,
2660                    const struct GNUNET_PeerIdentity *sender,
2661                    const struct GNUNET_MessageHeader *message,
2662                    const struct GNUNET_ATS_Information*atsi)
2663 {
2664   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2665   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2666  
2667   return handle_ack (socket, tunnel, sender, ack, atsi);
2668 }
2669
2670
2671 /**
2672  * For client message handlers, the stream socket is in the
2673  * closure argument.
2674  */
2675 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2676   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2677   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2678    sizeof (struct GNUNET_STREAM_AckMessage) },
2679   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2680    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2681   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2682    sizeof (struct GNUNET_STREAM_MessageHeader)},
2683   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2684    sizeof (struct GNUNET_STREAM_MessageHeader)},
2685   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2686    sizeof (struct GNUNET_STREAM_MessageHeader)},
2687   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2688    sizeof (struct GNUNET_STREAM_MessageHeader)},
2689   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2690    sizeof (struct GNUNET_STREAM_MessageHeader)},
2691   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2692    sizeof (struct GNUNET_STREAM_MessageHeader)},
2693   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2694    sizeof (struct GNUNET_STREAM_MessageHeader)},
2695   {NULL, 0, 0}
2696 };
2697
2698
2699 /**
2700  * For server message handlers, the stream socket is in the
2701  * tunnel context, and the listen socket in the closure argument.
2702  */
2703 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2704   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2705   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2706    sizeof (struct GNUNET_STREAM_AckMessage) },
2707   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2708    sizeof (struct GNUNET_STREAM_MessageHeader)},
2709   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2710    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2711   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2712    sizeof (struct GNUNET_STREAM_MessageHeader)},
2713   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2714    sizeof (struct GNUNET_STREAM_MessageHeader)},
2715   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2716    sizeof (struct GNUNET_STREAM_MessageHeader)},
2717   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2718    sizeof (struct GNUNET_STREAM_MessageHeader)},
2719   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2720    sizeof (struct GNUNET_STREAM_MessageHeader)},
2721   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2722    sizeof (struct GNUNET_STREAM_MessageHeader)},
2723   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2724    sizeof (struct GNUNET_STREAM_MessageHeader)},
2725   {NULL, 0, 0}
2726 };
2727
2728
2729 /**
2730  * Function called when our target peer is connected to our tunnel
2731  *
2732  * @param cls the socket for which this tunnel is created
2733  * @param peer the peer identity of the target
2734  * @param atsi performance data for the connection
2735  */
2736 static void
2737 mesh_peer_connect_callback (void *cls,
2738                             const struct GNUNET_PeerIdentity *peer,
2739                             const struct GNUNET_ATS_Information * atsi)
2740 {
2741   struct GNUNET_STREAM_Socket *socket = cls;
2742   struct GNUNET_STREAM_MessageHeader *message;
2743   
2744   if (0 != memcmp (peer,
2745                    &socket->other_peer,
2746                    sizeof (struct GNUNET_PeerIdentity)))
2747   {
2748     LOG (GNUNET_ERROR_TYPE_DEBUG,
2749          "%s: A peer which is not our target has connected to our tunnel\n",
2750          GNUNET_i2s(peer));
2751     return;
2752   }
2753   LOG (GNUNET_ERROR_TYPE_DEBUG,
2754        "%s: Target peer %s connected\n",
2755        GNUNET_i2s (&socket->other_peer),
2756        GNUNET_i2s (&socket->other_peer));
2757   /* Set state to INIT */
2758   socket->state = STATE_INIT;
2759   /* Send HELLO message */
2760   message = generate_hello ();
2761   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2762   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2763                  socket->control_retransmission_task_id);
2764   socket->control_retransmission_task_id =
2765     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2766                                   &control_retransmission_task, socket);
2767 }
2768
2769
2770 /**
2771  * Function called when our target peer is disconnected from our tunnel
2772  *
2773  * @param cls the socket associated which this tunnel
2774  * @param peer the peer identity of the target
2775  */
2776 static void
2777 mesh_peer_disconnect_callback (void *cls,
2778                                const struct GNUNET_PeerIdentity *peer)
2779 {
2780   struct GNUNET_STREAM_Socket *socket=cls;
2781   
2782   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2783   LOG (GNUNET_ERROR_TYPE_DEBUG,
2784        "%s: Other peer %s disconnected \n",
2785        GNUNET_i2s (&socket->other_peer),
2786        GNUNET_i2s (&socket->other_peer));
2787 }
2788
2789
2790 /**
2791  * Method called whenever a peer creates a tunnel to us
2792  *
2793  * @param cls closure
2794  * @param tunnel new handle to the tunnel
2795  * @param initiator peer that started the tunnel
2796  * @param atsi performance information for the tunnel
2797  * @return initial tunnel context for the tunnel
2798  *         (can be NULL -- that's not an error)
2799  */
2800 static void *
2801 new_tunnel_notify (void *cls,
2802                    struct GNUNET_MESH_Tunnel *tunnel,
2803                    const struct GNUNET_PeerIdentity *initiator,
2804                    const struct GNUNET_ATS_Information *atsi)
2805 {
2806   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2807   struct GNUNET_STREAM_Socket *socket;
2808
2809   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2810      from the same peer again until the socket is closed */
2811
2812   if (GNUNET_NO == lsocket->listening)
2813   {
2814     GNUNET_MESH_tunnel_destroy (tunnel);
2815     return NULL;
2816   }
2817   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2818   socket->other_peer = *initiator;
2819   socket->tunnel = tunnel;
2820   socket->state = STATE_INIT;
2821   socket->lsocket = lsocket;
2822   socket->stat_handle = lsocket->stat_handle;
2823   socket->retransmit_timeout = lsocket->retransmit_timeout;
2824   socket->testing_active = lsocket->testing_active;
2825   socket->testing_set_write_sequence_number_value =
2826       lsocket->testing_set_write_sequence_number_value;
2827   socket->max_payload_size = lsocket->max_payload_size;
2828   LOG (GNUNET_ERROR_TYPE_DEBUG,
2829        "%s: Peer %s initiated tunnel to us\n", 
2830        GNUNET_i2s (&socket->other_peer),
2831        GNUNET_i2s (&socket->other_peer));
2832   if (NULL != socket->stat_handle)
2833   {
2834     GNUNET_STATISTICS_update (socket->stat_handle,
2835                               "total inbound connections received",
2836                               1, GNUNET_NO);
2837     GNUNET_STATISTICS_update (socket->stat_handle,
2838                               "inbound connections", 1, GNUNET_NO);
2839   }
2840   
2841   return socket;
2842 }
2843
2844
2845 /**
2846  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2847  * any associated state.  This function is NOT called if the client has
2848  * explicitly asked for the tunnel to be destroyed using
2849  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2850  * the tunnel.
2851  *
2852  * @param cls closure (set from GNUNET_MESH_connect)
2853  * @param tunnel connection to the other end (henceforth invalid)
2854  * @param tunnel_ctx place where local state associated
2855  *                   with the tunnel is stored
2856  */
2857 static void 
2858 tunnel_cleaner (void *cls,
2859                 const struct GNUNET_MESH_Tunnel *tunnel,
2860                 void *tunnel_ctx)
2861 {
2862   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2863   struct MessageQueue *head;
2864
2865   GNUNET_assert (tunnel == socket->tunnel);
2866   GNUNET_break_op(0);
2867   LOG (GNUNET_ERROR_TYPE_DEBUG,
2868        "%s: Peer %s has terminated connection abruptly\n",
2869        GNUNET_i2s (&socket->other_peer),
2870        GNUNET_i2s (&socket->other_peer));
2871   if (NULL != socket->stat_handle)
2872   {
2873     GNUNET_STATISTICS_update (socket->stat_handle,
2874                               "connections terminated abruptly", 1, GNUNET_NO);
2875     GNUNET_STATISTICS_update (socket->stat_handle,
2876                               "inbound connections", -1, GNUNET_NO);
2877   }
2878   socket->status = GNUNET_STREAM_SHUTDOWN;
2879   /* Clear Transmit handles */
2880   if (NULL != socket->transmit_handle)
2881   {
2882     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2883     socket->transmit_handle = NULL;
2884   }
2885   /* Stop Tasks using socket->tunnel */
2886   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2887   {
2888     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2889     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2890   }
2891   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2892   {
2893     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2894     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2895   }  
2896   /* Terminate the control retransmission tasks */
2897   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2898   {
2899     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2900     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2901   }
2902   /* Clear Transmit handles */
2903   if (NULL != socket->transmit_handle)
2904   {
2905     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2906     socket->transmit_handle = NULL;
2907   }
2908   /* Clear existing message queue */
2909   while (NULL != (head = socket->queue_head)) {
2910     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2911                                  socket->queue_tail,
2912                                  head);
2913     GNUNET_free (head->message);
2914     GNUNET_free (head);
2915   }
2916   socket->tunnel = NULL;
2917 }
2918
2919
2920 /**
2921  * Callback to signal timeout on lockmanager lock acquire
2922  *
2923  * @param cls the ListenSocket
2924  * @param tc the scheduler task context
2925  */
2926 static void
2927 lockmanager_acquire_timeout (void *cls, 
2928                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2929 {
2930   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2931   GNUNET_STREAM_ListenCallback listen_cb;
2932   void *listen_cb_cls;
2933
2934   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2935   listen_cb = lsocket->listen_cb;
2936   listen_cb_cls = lsocket->listen_cb_cls;
2937   if (NULL != listen_cb)
2938     listen_cb (listen_cb_cls, NULL, NULL);
2939 }
2940
2941
2942 /**
2943  * Callback to notify us on the status changes on app_port lock
2944  *
2945  * @param cls the ListenSocket
2946  * @param domain the domain name of the lock
2947  * @param lock the app_port
2948  * @param status the current status of the lock
2949  */
2950 static void
2951 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2952                        enum GNUNET_LOCKMANAGER_Status status)
2953 {
2954   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2955
2956   GNUNET_assert (lock == (uint32_t) lsocket->port);
2957   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2958   {
2959     lsocket->listening = GNUNET_YES;
2960     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2961     {
2962       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2963       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2964     }
2965     if (NULL == lsocket->mesh)
2966     {
2967       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2968
2969       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2970                                            lsocket, /* Closure */
2971                                            &new_tunnel_notify,
2972                                            &tunnel_cleaner,
2973                                            server_message_handlers,
2974                                            ports);
2975       GNUNET_assert (NULL != lsocket->mesh);
2976       if (NULL != lsocket->listen_ok_cb)
2977       {
2978         (void) lsocket->listen_ok_cb ();
2979       }
2980     }
2981   }
2982   if (GNUNET_LOCKMANAGER_RELEASE == status)
2983     lsocket->listening = GNUNET_NO;
2984 }
2985
2986
2987 /*****************/
2988 /* API functions */
2989 /*****************/
2990
2991
2992 /**
2993  * Tries to open a stream to the target peer
2994  *
2995  * @param cfg configuration to use
2996  * @param target the target peer to which the stream has to be opened
2997  * @param app_port the application port number which uniquely identifies this
2998  *            stream
2999  * @param open_cb this function will be called after stream has be established;
3000  *          cannot be NULL
3001  * @param open_cb_cls the closure for open_cb
3002  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3003  * @return if successful it returns the stream socket; NULL if stream cannot be
3004  *         opened 
3005  */
3006 struct GNUNET_STREAM_Socket *
3007 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3008                     const struct GNUNET_PeerIdentity *target,
3009                     GNUNET_MESH_ApplicationType app_port,
3010                     GNUNET_STREAM_OpenCallback open_cb,
3011                     void *open_cb_cls,
3012                     ...)
3013 {
3014   struct GNUNET_STREAM_Socket *socket;
3015   enum GNUNET_STREAM_Option option;
3016   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3017   va_list vargs;
3018   uint16_t payload_size;
3019
3020   LOG (GNUNET_ERROR_TYPE_DEBUG,
3021        "%s\n", __func__);
3022   GNUNET_assert (NULL != open_cb);
3023   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3024   socket->other_peer = *target;
3025   socket->open_cb = open_cb;
3026   socket->open_cls = open_cb_cls;
3027   /* Set defaults */
3028   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3029   socket->testing_active = GNUNET_NO;
3030   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3031   va_start (vargs, open_cb_cls); /* Parse variable args */
3032   do {
3033     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3034     switch (option)
3035     {
3036     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3037       /* Expect struct GNUNET_TIME_Relative */
3038       socket->retransmit_timeout = va_arg (vargs,
3039                                            struct GNUNET_TIME_Relative);
3040       break;
3041     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3042       socket->testing_active = GNUNET_YES;
3043       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3044                                                                 uint32_t);
3045       break;
3046     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3047       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3048       break;
3049     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3050       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3051       break;
3052     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3053       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3054       GNUNET_assert (0 != payload_size);
3055       if (payload_size < socket->max_payload_size)
3056         socket->max_payload_size = payload_size;
3057       break;
3058     case GNUNET_STREAM_OPTION_END:
3059       break;
3060     }
3061   } while (GNUNET_STREAM_OPTION_END != option);
3062   va_end (vargs);               /* End of variable args parsing */
3063   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3064                                       socket, /* cls */
3065                                       NULL, /* No inbound tunnel handler */
3066                                       NULL, /* No in-tunnel cleaner */
3067                                       client_message_handlers,
3068                                       ports); /* We don't get inbound tunnels */
3069   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3070   {
3071     GNUNET_free (socket);
3072     return NULL;
3073   }
3074   /* Now create the mesh tunnel to target */
3075   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3076   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3077                                               NULL, /* Tunnel context */
3078                                               &mesh_peer_connect_callback,
3079                                               &mesh_peer_disconnect_callback,
3080                                               socket);
3081   GNUNET_assert (NULL != socket->tunnel);
3082   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3083                                         &socket->other_peer);
3084   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3085   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3086   return socket;
3087 }
3088
3089
3090 /**
3091  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3092  *
3093  * @param socket the stream socket
3094  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3095  * @param completion_cb the callback that will be called upon successful
3096  *          shutdown of given operation
3097  * @param completion_cls the closure for the completion callback
3098  * @return the shutdown handle
3099  */
3100 struct GNUNET_STREAM_ShutdownHandle *
3101 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3102                         int operation,
3103                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3104                         void *completion_cls)
3105 {
3106   struct GNUNET_STREAM_ShutdownHandle *handle;
3107   struct GNUNET_STREAM_MessageHeader *msg;
3108   
3109   GNUNET_assert (NULL == socket->shutdown_handle);
3110   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3111   handle->socket = socket;
3112   handle->completion_cb = completion_cb;
3113   handle->completion_cls = completion_cls;
3114   socket->shutdown_handle = handle;
3115   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3116        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3117        || ((GNUNET_YES == socket->transmit_closed) 
3118            && (GNUNET_YES == socket->receive_closed)
3119            && (SHUT_RDWR == operation)) )
3120   {
3121     handle->operation = operation;
3122     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3123                                                           socket);
3124     return handle;
3125   }
3126   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3127   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3128   switch (operation)
3129   {
3130   case SHUT_RD:
3131     handle->operation = SHUT_RD;
3132     if (NULL != socket->read_handle)
3133       LOG (GNUNET_ERROR_TYPE_WARNING,
3134            "Existing read handle should be cancelled before shutting"
3135            " down reading\n");
3136     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3137     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3138                    GNUNET_NO);
3139     socket->receive_closed = GNUNET_YES;
3140     break;
3141   case SHUT_WR:
3142     handle->operation = SHUT_WR;
3143     if (NULL != socket->write_handle)
3144       LOG (GNUNET_ERROR_TYPE_WARNING,
3145            "Existing write handle should be cancelled before shutting"
3146            " down writing\n");
3147     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3148     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3149                    GNUNET_NO);
3150     socket->transmit_closed = GNUNET_YES;
3151     break;
3152   case SHUT_RDWR:
3153     handle->operation = SHUT_RDWR;
3154     if (NULL != socket->write_handle)
3155       LOG (GNUNET_ERROR_TYPE_WARNING,
3156            "Existing write handle should be cancelled before shutting"
3157            " down writing\n");
3158     if (NULL != socket->read_handle)
3159       LOG (GNUNET_ERROR_TYPE_WARNING,
3160            "Existing read handle should be cancelled before shutting"
3161            " down reading\n");
3162     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3163     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3164     socket->transmit_closed = GNUNET_YES;
3165     socket->receive_closed = GNUNET_YES;
3166     break;
3167   default:
3168     LOG (GNUNET_ERROR_TYPE_WARNING,
3169          "GNUNET_STREAM_shutdown called with invalid value for "
3170          "parameter operation -- Ignoring\n");
3171     GNUNET_free (msg);
3172     GNUNET_free (handle);
3173     return NULL;
3174   }
3175   handle->close_msg_retransmission_task_id =
3176     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3177                                   &close_msg_retransmission_task,
3178                                   handle);
3179   return handle;
3180 }
3181
3182
3183 /**
3184  * Cancels a pending shutdown. Note that the shutdown messages may already be
3185  * sent and the stream is shutdown already for the operation given to
3186  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3187  * shutdown messages and frees the shutdown handle.
3188  *
3189  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3190  */
3191 void
3192 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3193 {
3194   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3195     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3196   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3197     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3198   handle->socket->shutdown_handle = NULL;
3199   GNUNET_free (handle);
3200 }
3201
3202
3203 /**
3204  * Closes the stream
3205  *
3206  * @param socket the stream socket
3207  */
3208 void
3209 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3210 {
3211   struct MessageQueue *head;
3212
3213   if (NULL != socket->read_handle)
3214   {
3215     LOG (GNUNET_ERROR_TYPE_WARNING,
3216          "Closing STREAM socket when a read handle is pending\n");
3217     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3218   }
3219   if (NULL != socket->write_handle)
3220   {
3221     LOG (GNUNET_ERROR_TYPE_WARNING,
3222          "Closing STREAM socket when a write handle is pending\n");
3223     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3224     //socket->write_handle = NULL;
3225   }
3226   /* Terminate the ack'ing task if they are still present */
3227   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3228   {
3229     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3230     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3231   }
3232   /* Terminate the control retransmission tasks */
3233   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3234   {
3235     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3236   }
3237   /* Clear Transmit handles */
3238   if (NULL != socket->transmit_handle)
3239   {
3240     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3241     socket->transmit_handle = NULL;
3242   }
3243   /* Clear existing message queue */
3244   while (NULL != (head = socket->queue_head)) {
3245     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3246                                  socket->queue_tail,
3247                                  head);
3248     GNUNET_free (head->message);
3249     GNUNET_free (head);
3250   }
3251   /* Close associated tunnel */
3252   if (NULL != socket->tunnel)
3253   {
3254     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3255     socket->tunnel = NULL;
3256   }
3257   /* Close mesh connection */
3258   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3259   {
3260     GNUNET_MESH_disconnect (socket->mesh);
3261     socket->mesh = NULL;
3262   }
3263   /* Close statistics connection */
3264   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3265     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3266   /* Release receive buffer */
3267   if (NULL != socket->receive_buffer)
3268   {
3269     GNUNET_free (socket->receive_buffer);
3270   }
3271   GNUNET_free (socket);
3272 }
3273
3274
3275 /**
3276  * Listens for stream connections for a specific application ports
3277  *
3278  * @param cfg the configuration to use
3279  * @param app_port the application port for which new streams will be accepted
3280  * @param listen_cb this function will be called when a peer tries to establish
3281  *            a stream with us
3282  * @param listen_cb_cls closure for listen_cb
3283  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3284  * @return listen socket, NULL for any error
3285  */
3286 struct GNUNET_STREAM_ListenSocket *
3287 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3288                       GNUNET_MESH_ApplicationType app_port,
3289                       GNUNET_STREAM_ListenCallback listen_cb,
3290                       void *listen_cb_cls,
3291                       ...)
3292 {
3293   struct GNUNET_STREAM_ListenSocket *lsocket;
3294   struct GNUNET_TIME_Relative listen_timeout;
3295   enum GNUNET_STREAM_Option option;
3296   va_list vargs;
3297   uint16_t payload_size;
3298
3299   GNUNET_assert (NULL != listen_cb);
3300   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3301   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3302   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3303   if (NULL == lsocket->lockmanager)
3304   {
3305     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3306     GNUNET_free (lsocket);
3307     return NULL;
3308   }
3309   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3310   /* Set defaults */
3311   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3312   lsocket->testing_active = GNUNET_NO;
3313   lsocket->listen_ok_cb = NULL;
3314   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3315   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3316   va_start (vargs, listen_cb_cls);
3317   do {
3318     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3319     switch (option)
3320     {
3321     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3322       lsocket->retransmit_timeout = va_arg (vargs,
3323                                             struct GNUNET_TIME_Relative);
3324       break;
3325     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3326       lsocket->testing_active = GNUNET_YES;
3327       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3328                                                                  uint32_t);
3329       break;
3330     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3331       listen_timeout = GNUNET_TIME_relative_multiply
3332         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3333       break;
3334     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3335       lsocket->listen_ok_cb = va_arg (vargs,
3336                                       GNUNET_STREAM_ListenSuccessCallback);
3337       break;
3338     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3339       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3340       GNUNET_assert (0 != payload_size);
3341       if (payload_size < lsocket->max_payload_size)
3342         lsocket->max_payload_size = payload_size;
3343       break;
3344     case GNUNET_STREAM_OPTION_END:
3345       break;
3346     }
3347   } while (GNUNET_STREAM_OPTION_END != option);
3348   va_end (vargs);
3349   lsocket->port = app_port;
3350   lsocket->listen_cb = listen_cb;
3351   lsocket->listen_cb_cls = listen_cb_cls;
3352   lsocket->locking_request = 
3353     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3354                                      (uint32_t) lsocket->port,
3355                                      &lock_status_change_cb, lsocket);
3356   lsocket->lockmanager_acquire_timeout_task =
3357     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3358                                   &lockmanager_acquire_timeout, lsocket);
3359   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3360                                                    lsocket->cfg);
3361   return lsocket;
3362 }
3363
3364
3365 /**
3366  * Closes the listen socket
3367  *
3368  * @param lsocket the listen socket
3369  */
3370 void
3371 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3372 {
3373   /* Close MESH connection */
3374   if (NULL != lsocket->mesh)
3375     GNUNET_MESH_disconnect (lsocket->mesh);
3376   if (NULL != lsocket->stat_handle)
3377     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3378   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3379   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3380     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3381   if (NULL != lsocket->locking_request)
3382     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3383   if (NULL != lsocket->lockmanager)
3384     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3385   GNUNET_free (lsocket);
3386 }
3387
3388
3389 /**
3390  * Tries to write the given data to the stream. The maximum size of data that
3391  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3392  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3393  * violation, however only the said number of maximum bytes will be written.
3394  *
3395  * @param socket the socket representing a stream
3396  * @param data the data buffer from where the data is written into the stream
3397  * @param size the number of bytes to be written from the data buffer
3398  * @param timeout the timeout period
3399  * @param write_cont the function to call upon writing some bytes into the
3400  *          stream 
3401  * @param write_cont_cls the closure
3402  *
3403  * @return handle to cancel the operation; if a previous write is pending or
3404  *           the stream has been shutdown for this operation then write_cont is
3405  *           immediately called and NULL is returned.
3406  */
3407 struct GNUNET_STREAM_IOWriteHandle *
3408 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3409                      const void *data,
3410                      size_t size,
3411                      struct GNUNET_TIME_Relative timeout,
3412                      GNUNET_STREAM_CompletionContinuation write_cont,
3413                      void *write_cont_cls)
3414 {
3415   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3416   struct GNUNET_STREAM_DataMessage *data_msg;
3417   const void *sweep;
3418   struct GNUNET_TIME_Relative ack_deadline;
3419   unsigned int num_needed_packets;
3420   unsigned int packet;
3421   uint32_t packet_size;
3422   uint32_t payload_size;
3423   uint16_t max_data_packet_size;
3424
3425   LOG (GNUNET_ERROR_TYPE_DEBUG,
3426        "%s\n", __func__);
3427   if (NULL != socket->write_handle)
3428   {
3429     GNUNET_break (0);
3430     return NULL;
3431   }
3432   switch (socket->state)
3433   {
3434   case STATE_TRANSMIT_CLOSED:
3435   case STATE_TRANSMIT_CLOSE_WAIT:
3436   case STATE_CLOSED:
3437   case STATE_CLOSE_WAIT:
3438     if (NULL != write_cont)
3439       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3440     LOG (GNUNET_ERROR_TYPE_DEBUG,
3441          "%s() END\n", __func__);
3442     return NULL;
3443   case STATE_INIT:
3444   case STATE_LISTEN:
3445   case STATE_HELLO_WAIT:
3446     if (NULL != write_cont)
3447       /* FIXME: GNUNET_STREAM_SYSERR?? */
3448       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3449     LOG (GNUNET_ERROR_TYPE_DEBUG,
3450          "%s() END\n", __func__);
3451     return NULL;
3452   case STATE_ESTABLISHED:
3453   case STATE_RECEIVE_CLOSED:
3454   case STATE_RECEIVE_CLOSE_WAIT:
3455     break;
3456   }
3457   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3458     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3459   num_needed_packets =
3460       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3461   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3462   io_handle->socket = socket;
3463   io_handle->write_cont = write_cont;
3464   io_handle->write_cont_cls = write_cont_cls;
3465   io_handle->size = size;
3466   io_handle->packets_sent = 0;
3467   sweep = data;
3468   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3469      determined from RTT */
3470   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3471   /* Divide the given buffer into packets for sending */
3472   max_data_packet_size =
3473       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3474   for (packet=0; packet < num_needed_packets; packet++)
3475   {
3476     if ((packet + 1) * socket->max_payload_size < size) 
3477     {
3478       payload_size = socket->max_payload_size;
3479       packet_size = max_data_packet_size;
3480     }
3481     else 
3482     {
3483       payload_size = size - packet * socket->max_payload_size;
3484       packet_size = 
3485           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3486     }
3487     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3488     io_handle->messages[packet]->header.header.size = htons (packet_size);
3489     io_handle->messages[packet]->header.header.type =
3490       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3491     io_handle->messages[packet]->sequence_number =
3492       htonl (socket->write_sequence_number++);
3493     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3494     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3495        determined from RTT */
3496     io_handle->messages[packet]->ack_deadline =
3497       GNUNET_TIME_relative_hton (ack_deadline);
3498     data_msg = io_handle->messages[packet];
3499     /* Copy data from given buffer to the packet */
3500     memcpy (&data_msg[1], sweep, payload_size);
3501     sweep += payload_size;
3502     socket->write_offset += payload_size;
3503   }
3504   /* ack the last data message. FIXME: remove when we figure out how to do this
3505      using RTT */
3506   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3507       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3508   socket->write_handle = io_handle;
3509   write_data (socket);
3510   LOG (GNUNET_ERROR_TYPE_DEBUG,
3511        "%s() END\n", __func__);
3512   return io_handle;
3513 }
3514
3515
3516 /**
3517  * Tries to read data from the stream.
3518  *
3519  * @param socket the socket representing a stream
3520  * @param timeout the timeout period
3521  * @param proc function to call with data (once only)
3522  * @param proc_cls the closure for proc
3523  *
3524  * @return handle to cancel the operation; NULL is returned if: the stream has
3525  *           been shutdown for this type of opeartion (the DataProcessor is
3526  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3527  *           read handle is present (only one read handle per socket is present
3528  *           at any time)
3529  */
3530 struct GNUNET_STREAM_IOReadHandle *
3531 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3532                     struct GNUNET_TIME_Relative timeout,
3533                     GNUNET_STREAM_DataProcessor proc,
3534                     void *proc_cls)
3535 {
3536   struct GNUNET_STREAM_IOReadHandle *read_handle;
3537   
3538   LOG (GNUNET_ERROR_TYPE_DEBUG,
3539        "%s: %s()\n", 
3540        GNUNET_i2s (&socket->other_peer),
3541        __func__);
3542   /* Return NULL if there is already a read handle; the user has to cancel that
3543      first before continuing or has to wait until it is completed */
3544   if (NULL != socket->read_handle) 
3545     return NULL;
3546   GNUNET_assert (NULL != proc);
3547   switch (socket->state)
3548   {
3549   case STATE_RECEIVE_CLOSED:
3550   case STATE_RECEIVE_CLOSE_WAIT:
3551   case STATE_CLOSED:
3552   case STATE_CLOSE_WAIT:
3553     LOG (GNUNET_ERROR_TYPE_DEBUG,
3554          "%s: %s() END\n",
3555          GNUNET_i2s (&socket->other_peer),
3556          __func__);
3557     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3558     return NULL;
3559   default:
3560     break;
3561   }
3562   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3563   read_handle->proc = proc;
3564   read_handle->proc_cls = proc_cls;
3565   read_handle->socket = socket;
3566   socket->read_handle = read_handle;
3567   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3568                                           0))
3569     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3570                                                           socket);   
3571   read_handle->read_io_timeout_task_id =
3572       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3573   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3574        GNUNET_i2s (&socket->other_peer), __func__);
3575   return read_handle;
3576 }
3577
3578
3579 /**
3580  * Cancel pending write operation.
3581  *
3582  * @param ioh handle to operation to cancel
3583  */
3584 void
3585 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3586 {
3587   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3588   unsigned int packet;
3589
3590   GNUNET_assert (NULL != socket->write_handle);
3591   GNUNET_assert (socket->write_handle == ioh);
3592
3593   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3594   {
3595     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3596     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3597   }
3598
3599   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3600   {
3601     if (NULL == ioh->messages[packet]) break;
3602     GNUNET_free (ioh->messages[packet]);
3603   }
3604       
3605   GNUNET_free (socket->write_handle);
3606   socket->write_handle = NULL;
3607 }
3608
3609
3610 /**
3611  * Cancel pending read operation.
3612  *
3613  * @param ioh handle to operation to cancel
3614  */
3615 void
3616 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3617 {
3618   struct GNUNET_STREAM_Socket *socket;
3619   
3620   socket = ioh->socket;
3621   GNUNET_assert (NULL != socket->read_handle);
3622   GNUNET_assert (ioh == socket->read_handle);
3623   /* Read io time task should be there; if it is already executed then this
3624   read handle is not valid; However upon scheduler shutdown the read io task
3625   may be executed before */
3626   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3627     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3628   /* reading task may be present; if so we have to stop it */
3629   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3630     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3631   GNUNET_free (ioh);
3632   socket->read_handle = NULL;
3633 }
3634
3635 /* end of stream_api.c */