- fix 2699
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_util_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_WriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_ReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * Mesh transmit timeout
285    */
286   struct GNUNET_TIME_Relative mesh_retry_timeout;
287
288   /**
289    * Data retransmission timeout
290    */
291   struct GNUNET_TIME_Relative data_retransmit_timeout;
292
293   /**
294    * The state of the protocol associated with this socket
295    */
296   enum State state;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * Is receive closed
305    */
306   int receive_closed;
307
308   /**
309    * Is transmission closed
310    */
311   int transmit_closed;
312
313   /**
314    * The application port number (type: uint32_t)
315    */
316   GNUNET_MESH_ApplicationType app_port;
317
318   /**
319    * The write sequence number to be set incase of testing
320    */
321   uint32_t testing_set_write_sequence_number_value;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363
364   /**
365    * The maximum size of the data message payload this stream handle can send
366    */
367   uint16_t max_payload_size;
368 };
369
370
371 /**
372  * A socket for listening
373  */
374 struct GNUNET_STREAM_ListenSocket
375 {
376   /**
377    * The mesh handle
378    */
379   struct GNUNET_MESH_Handle *mesh;
380
381   /**
382    * Handle to statistics
383    */
384   struct GNUNET_STATISTICS_Handle *stat_handle;
385
386   /**
387    * Our configuration
388    */
389   struct GNUNET_CONFIGURATION_Handle *cfg;
390
391   /**
392    * Handle to the lock manager service
393    */
394   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
395
396   /**
397    * The active LockingRequest from lockmanager
398    */
399   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
400
401   /**
402    * Callback to call after acquring a lock and listening
403    */
404   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
405
406   /**
407    * The callback function which is called after successful opening socket
408    */
409   GNUNET_STREAM_ListenCallback listen_cb;
410
411   /**
412    * The call back closure
413    */
414   void *listen_cb_cls;
415
416   /**
417    * The service port
418    */
419   GNUNET_MESH_ApplicationType port;
420   
421   /**
422    * The id of the lockmanager timeout task
423    */
424   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
425
426   /**
427    * The retransmit timeout
428    */
429   struct GNUNET_TIME_Relative retransmit_timeout;
430   
431   /**
432    * Listen enabled?
433    */
434   int listening;
435
436   /**
437    * Whether testing mode is active or not
438    */
439   int testing_active;
440
441   /**
442    * The write sequence number to be set incase of testing
443    */
444   uint32_t testing_set_write_sequence_number_value;
445
446   /**
447    * The maximum size of the data message payload this stream handle can send
448    */
449   uint16_t max_payload_size;
450
451 };
452
453
454 /**
455  * The IO Write Handle
456  */
457 struct GNUNET_STREAM_WriteHandle
458 {
459   /**
460    * The socket to which this write handle is associated
461    */
462   struct GNUNET_STREAM_Socket *socket;
463
464   /**
465    * The write continuation callback
466    */
467   GNUNET_STREAM_CompletionContinuation write_cont;
468
469   /**
470    * Write continuation closure
471    */
472   void *write_cont_cls;
473
474   /**
475    * The packet_buffers associated with this Handle
476    */
477   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
478
479   /**
480    * The bitmap of this IOHandle; Corresponding bit for a message is set when
481    * it has been acknowledged by the receiver
482    */
483   GNUNET_STREAM_AckBitmap ack_bitmap;
484
485   /**
486    * Number of bytes in this write handle
487    */
488   size_t size;
489
490   /**
491    * Number of packets already transmitted from this IO handle. Retransmitted
492    * packets are not taken into account here. This is used to determine which
493    * packets account for retransmission and which packets occupy buffer space at
494    * the receiver.
495    */
496   unsigned int packets_sent;
497
498   /**
499    * The maximum of the base numbers of the received acks
500    */
501   uint32_t max_ack_base_num;
502
503 };
504
505
506 /**
507  * The IO Read Handle
508  */
509 struct GNUNET_STREAM_ReadHandle
510 {
511   /**
512    * The socket to which this read handle is associated
513    */
514   struct GNUNET_STREAM_Socket *socket;
515   
516   /**
517    * Callback for the read processor
518    */
519   GNUNET_STREAM_DataProcessor proc;
520
521   /**
522    * The closure pointer for the read processor callback
523    */
524   void *proc_cls;
525
526   /**
527    * Task identifier for the read io timeout task
528    */
529   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
530
531   /**
532    * Task scheduled to continue a read operation.
533    */
534   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
535
536   /**
537    * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call
538    * the read processor task
539    */
540   GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id;
541 };
542
543
544 /**
545  * Handle for Shutdown
546  */
547 struct GNUNET_STREAM_ShutdownHandle
548 {
549   /**
550    * The socket associated with this shutdown handle
551    */
552   struct GNUNET_STREAM_Socket *socket;
553
554   /**
555    * Shutdown completion callback
556    */
557   GNUNET_STREAM_ShutdownCompletion completion_cb;
558
559   /**
560    * Closure for completion callback
561    */
562   void *completion_cls;
563
564   /**
565    * Close message retransmission task id
566    */
567   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
568
569   /**
570    * Task scheduled to call the shutdown continuation callback
571    */
572   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
573
574   /**
575    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
576    */
577   int operation;  
578 };
579
580
581 /**
582  * Default value in seconds for various timeouts
583  */
584 static const unsigned int default_timeout = 10;
585
586 /**
587  * The domain name for locks we use here
588  */
589 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
590
591 /**
592  * Callback function for sending queued message
593  *
594  * @param cls closure the socket
595  * @param size number of bytes available in buf
596  * @param buf where the callee should write the message
597  * @return number of bytes written to buf
598  */
599 static size_t
600 send_message_notify (void *cls, size_t size, void *buf)
601 {
602   struct GNUNET_STREAM_Socket *socket = cls;
603   struct MessageQueue *head;
604   size_t ret;
605
606   socket->transmit_handle = NULL; /* Remove the transmit handle */
607   head = socket->queue_head;
608   if (NULL == head)
609     return 0; /* just to be safe */
610   if (0 == size)                /* request timed out */
611   {
612     socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
613         (socket->mesh_retry_timeout);    
614     LOG (GNUNET_ERROR_TYPE_DEBUG,
615          "%s: Message sending to MESH timed out. Retrying in %s \n",
616          GNUNET_i2s (&socket->other_peer),
617          GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
618                                                  GNUNET_YES));
619     socket->transmit_handle = 
620         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
621                                            GNUNET_NO, /* Corking */
622                                            socket->mesh_retry_timeout,
623                                            &socket->other_peer,
624                                            ntohs (head->message->header.size),
625                                            &send_message_notify,
626                                            socket);
627     return 0;
628   }
629   ret = ntohs (head->message->header.size);
630   GNUNET_assert (size >= ret);
631   memcpy (buf, head->message, ret);
632   if (NULL != head->finish_cb)
633   {
634     head->finish_cb (head->finish_cb_cls, socket);
635   }
636   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
637                                socket->queue_tail,
638                                head);
639   GNUNET_free (head->message);
640   GNUNET_free (head);
641   if (NULL != socket->transmit_handle)
642     return ret; /* 'finish_cb' might have triggered message already! */
643   head = socket->queue_head;
644   if (NULL != head)    /* more pending messages to send */
645   {
646     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
647     socket->transmit_handle = 
648         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
649                                            GNUNET_NO, /* Corking */
650                                            socket->mesh_retry_timeout,
651                                            &socket->other_peer,
652                                            ntohs (head->message->header.size),
653                                            &send_message_notify,
654                                            socket);
655   }
656   return ret;
657 }
658
659
660 /**
661  * Queues a message for sending using the mesh connection of a socket
662  *
663  * @param socket the socket whose mesh connection is used
664  * @param message the message to be sent
665  * @param finish_cb the callback to be called when the message is sent
666  * @param finish_cb_cls the closure for the callback
667  * @param urgent set to GNUNET_YES to add the message to the beginning of the
668  *          queue; GNUNET_NO to add at the tail
669  */
670 static void
671 queue_message (struct GNUNET_STREAM_Socket *socket,
672                struct GNUNET_STREAM_MessageHeader *message,
673                SendFinishCallback finish_cb,
674                void *finish_cb_cls,
675                int urgent)
676 {
677   struct MessageQueue *queue_entity;
678
679   GNUNET_assert 
680     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
681      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
682   LOG (GNUNET_ERROR_TYPE_DEBUG,
683        "%s: Queueing message of type %d and size %d\n",
684        GNUNET_i2s (&socket->other_peer),
685        ntohs (message->header.type),
686        ntohs (message->header.size));
687   GNUNET_assert (NULL != message);
688   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
689   queue_entity->message = message;
690   queue_entity->finish_cb = finish_cb;
691   queue_entity->finish_cb_cls = finish_cb_cls;
692   if (GNUNET_YES == urgent)
693   {
694     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
695                                  queue_entity);
696     if (NULL != socket->transmit_handle)
697     {
698       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
699       socket->transmit_handle = NULL;
700     }
701   }
702   else
703     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
704                                       socket->queue_tail,
705                                       queue_entity);
706   if (NULL == socket->transmit_handle)
707   {
708     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
709     socket->transmit_handle = 
710         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
711                                            GNUNET_NO, /* Corking */
712                                            socket->mesh_retry_timeout,
713                                            &socket->other_peer,
714                                            ntohs (message->header.size),
715                                            &send_message_notify,
716                                            socket);
717   }
718 }
719
720
721 /**
722  * Copies a message and queues it for sending using the mesh connection of
723  * given socket 
724  *
725  * @param socket the socket whose mesh connection is used
726  * @param message the message to be sent
727  * @param finish_cb the callback to be called when the message is sent
728  * @param finish_cb_cls the closure for the callback
729  */
730 static void
731 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
732                         const struct GNUNET_STREAM_MessageHeader *message,
733                         SendFinishCallback finish_cb,
734                         void *finish_cb_cls)
735 {
736   struct GNUNET_STREAM_MessageHeader *msg_copy;
737   uint16_t size;
738   
739   size = ntohs (message->header.size);
740   msg_copy = GNUNET_malloc (size);
741   memcpy (msg_copy, message, size);
742   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
743 }
744
745
746 /**
747  * Writes data using the given socket. The amount of data written is limited by
748  * the receiver_window_size
749  *
750  * @param socket the socket to use
751  */
752 static void 
753 write_data (struct GNUNET_STREAM_Socket *socket);
754
755
756 /**
757  * Task for retransmitting data messages if they aren't ACK before their ack
758  * deadline 
759  *
760  * @param cls the socket
761  * @param tc the Task context
762  */
763 static void
764 data_retransmission_task (void *cls,
765                           const struct GNUNET_SCHEDULER_TaskContext *tc)
766 {
767   struct GNUNET_STREAM_Socket *socket = cls;
768   
769   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
770   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
771     return;
772   LOG (GNUNET_ERROR_TYPE_DEBUG,
773        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
774   write_data (socket);
775 }
776
777
778 /**
779  * Task for sending ACK message
780  *
781  * @param cls the socket
782  * @param tc the Task context
783  */
784 static void
785 ack_task (void *cls,
786           const struct GNUNET_SCHEDULER_TaskContext *tc)
787 {
788   struct GNUNET_STREAM_Socket *socket = cls;
789   struct GNUNET_STREAM_AckMessage *ack_msg;
790
791   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
792   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
793     return;
794   /* Create the ACK Message */
795   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
796   ack_msg->header.header.size = htons (sizeof (struct 
797                                                GNUNET_STREAM_AckMessage));
798   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
799   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
800   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
801   ack_msg->receive_window_remaining = 
802     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
803   /* Queue up ACK for immediate sending */
804   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
805 }
806
807
808 /**
809  * Retransmission task for shutdown messages
810  *
811  * @param cls the shutdown handle
812  * @param tc the Task Context
813  */
814 static void
815 close_msg_retransmission_task (void *cls,
816                                const struct GNUNET_SCHEDULER_TaskContext *tc)
817 {
818   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
819   struct GNUNET_STREAM_MessageHeader *msg;
820   struct GNUNET_STREAM_Socket *socket;
821
822   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
823   GNUNET_assert (NULL != shutdown_handle);
824   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
825     return;
826   socket = shutdown_handle->socket;
827   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
828   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
829   switch (shutdown_handle->operation)
830   {
831   case SHUT_RDWR:
832     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
833     break;
834   case SHUT_RD:
835     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
836     break;
837   case SHUT_WR:
838     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
839     break;
840   default:
841     GNUNET_free (msg);
842     shutdown_handle->close_msg_retransmission_task_id = 
843       GNUNET_SCHEDULER_NO_TASK;
844     return;
845   }
846   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
847   shutdown_handle->close_msg_retransmission_task_id =
848     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
849                                   &close_msg_retransmission_task,
850                                   shutdown_handle);
851 }
852
853
854 /**
855  * Function to modify a bit in GNUNET_STREAM_AckBitmap
856  *
857  * @param bitmap the bitmap to modify
858  * @param bit the bit number to modify
859  * @param value GNUNET_YES to on, GNUNET_NO to off
860  */
861 static void
862 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
863                       unsigned int bit, 
864                       int value)
865 {
866   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
867   if (GNUNET_YES == value)
868     *bitmap |= (1LL << bit);
869   else
870     *bitmap &= ~(1LL << bit);
871 }
872
873
874 /**
875  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
876  *
877  * @param bitmap address of the bitmap that has to be checked
878  * @param bit the bit number to check
879  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
880  */
881 static uint8_t
882 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
883                       unsigned int bit)
884 {
885   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
886   return 0 != (*bitmap & (1LL << bit));
887 }
888
889
890 /**
891  * Writes data using the given socket. The amount of data written is limited by
892  * the receiver_window_size
893  *
894  * @param socket the socket to use
895  */
896 static void 
897 write_data (struct GNUNET_STREAM_Socket *socket)
898 {
899   struct GNUNET_STREAM_WriteHandle *io_handle = socket->write_handle;
900   unsigned int packet;
901   
902   for (packet=0; packet < io_handle->packets_sent; packet++)
903   {
904     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
905                                            packet))
906     {
907       LOG (GNUNET_ERROR_TYPE_DEBUG,
908            "%s: Retransmitting DATA message with sequence %u\n",
909            GNUNET_i2s (&socket->other_peer),
910            ntohl (io_handle->messages[packet]->sequence_number));
911       copy_and_queue_message (socket,
912                               &io_handle->messages[packet]->header,
913                               NULL,
914                               NULL);
915     }
916   }
917   /* Now send new packets if there is enough buffer space */
918   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
919          (NULL != io_handle->messages[packet]) &&
920          (socket->receiver_window_available 
921           >= ntohs (io_handle->messages[packet]->header.header.size)))
922   {
923     socket->receiver_window_available -= 
924       ntohs (io_handle->messages[packet]->header.header.size);
925     LOG (GNUNET_ERROR_TYPE_DEBUG,
926          "%s: Placing DATA message with sequence %u in send queue\n",
927          GNUNET_i2s (&socket->other_peer),
928          ntohl (io_handle->messages[packet]->sequence_number));
929     copy_and_queue_message (socket,
930                             &io_handle->messages[packet]->header,
931                             NULL,
932                             NULL);
933     packet++;
934   }
935   io_handle->packets_sent = packet;
936   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
937   {
938     socket->data_retransmit_timeout = GNUNET_TIME_STD_BACKOFF
939         (socket->data_retransmit_timeout);
940     socket->data_retransmission_task_id = 
941         GNUNET_SCHEDULER_add_delayed (socket->data_retransmit_timeout,
942                                       &data_retransmission_task,
943                                       socket);
944   }
945 }
946
947
948 /**
949  * Cleansup the sockets read handle
950  *
951  * @param socket the socket whose read handle has to be cleanedup
952  */
953 static void
954 cleanup_read_handle (struct GNUNET_STREAM_Socket *socket)
955 {
956   struct GNUNET_STREAM_ReadHandle *read_handle;
957
958   read_handle = socket->read_handle;
959   /* Read io time task should be there; if it is already executed then this
960   read handle is not valid; However upon scheduler shutdown the read io task
961   may be executed before */
962   if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id)
963     GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
964   /* reading task may be present; if so we have to stop it */
965   if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id)
966     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
967   if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id)
968     GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id);
969   GNUNET_free (read_handle);
970   socket->read_handle = NULL;
971 }
972
973
974 /**
975  * Task for calling the read processor
976  *
977  * @param cls the socket
978  * @param tc the task context
979  */
980 static void
981 call_read_processor (void *cls,
982                      const struct GNUNET_SCHEDULER_TaskContext *tc)
983 {
984   struct GNUNET_STREAM_Socket *socket = cls;
985   struct GNUNET_STREAM_ReadHandle *read_handle;
986   GNUNET_STREAM_DataProcessor proc;
987   void *proc_cls;
988   size_t read_size;
989   size_t valid_read_size;
990   unsigned int packet;
991   uint32_t sequence_increase;
992   uint32_t offset_increase;
993
994   read_handle = socket->read_handle;
995   GNUNET_assert (NULL != read_handle);
996   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
997   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
998     return;
999   if (NULL == socket->receive_buffer) 
1000     return;
1001   GNUNET_assert (NULL != read_handle->proc);
1002   /* Check the bitmap for any holes */
1003   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1004   {
1005     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
1006                                            packet))
1007       break;
1008   }
1009   /* We only call read processor if we have the first packet */
1010   GNUNET_assert (0 < packet);
1011   valid_read_size = 
1012     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
1013   GNUNET_assert (0 != valid_read_size);
1014   proc = read_handle->proc;
1015   proc_cls = read_handle->proc_cls;
1016   cleanup_read_handle (socket);
1017   /* Call the data processor */
1018   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
1019        GNUNET_i2s (&socket->other_peer));
1020   read_size = proc (proc_cls, GNUNET_STREAM_OK,
1021                     socket->receive_buffer + socket->copy_offset,
1022                     valid_read_size);
1023   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
1024        GNUNET_i2s (&socket->other_peer), read_size);
1025   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
1026        GNUNET_i2s (&socket->other_peer));
1027   GNUNET_assert (read_size <= valid_read_size);
1028   socket->copy_offset += read_size;
1029   /* Determine upto which packet we can remove from the buffer */
1030   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1031   {
1032     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1033     { 
1034       packet++; 
1035       break;
1036     }
1037     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1038       break;
1039   }
1040   /* If no packets can be removed we can't move the buffer */
1041   if (0 == packet) 
1042     return;
1043   sequence_increase = packet;
1044   LOG (GNUNET_ERROR_TYPE_DEBUG,
1045        "%s: Sequence increase after read processor completion: %u\n",
1046        GNUNET_i2s (&socket->other_peer), sequence_increase);
1047   /* Shift the data in the receive buffer */
1048   socket->receive_buffer = 
1049     memmove (socket->receive_buffer,
1050              socket->receive_buffer 
1051              + socket->receive_buffer_boundaries[sequence_increase-1],
1052              socket->receive_buffer_size
1053              - socket->receive_buffer_boundaries[sequence_increase-1]);
1054   /* Shift the bitmap */
1055   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1056   /* Set read_sequence_number */
1057   socket->read_sequence_number += sequence_increase;
1058   /* Set read_offset */
1059   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1060   socket->read_offset += offset_increase;
1061   /* Fix copy_offset */
1062   GNUNET_assert (offset_increase <= socket->copy_offset);
1063   socket->copy_offset -= offset_increase;
1064   /* Fix relative boundaries */
1065   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1066   {
1067     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1068     {
1069       uint32_t ahead_buffer_boundary;
1070
1071       ahead_buffer_boundary = 
1072         socket->receive_buffer_boundaries[packet + sequence_increase];
1073       if (0 == ahead_buffer_boundary)
1074         socket->receive_buffer_boundaries[packet] = 0;
1075       else
1076       {
1077         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1078         socket->receive_buffer_boundaries[packet] = 
1079           ahead_buffer_boundary - offset_increase;
1080       }
1081     }
1082     else
1083       socket->receive_buffer_boundaries[packet] = 0;
1084   }
1085 }
1086
1087
1088 /**
1089  * Cancels the existing read io handle
1090  *
1091  * @param cls the closure from the SCHEDULER call
1092  * @param tc the task context
1093  */
1094 static void
1095 read_io_timeout (void *cls,
1096                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1097 {
1098   struct GNUNET_STREAM_Socket *socket = cls;
1099   struct GNUNET_STREAM_ReadHandle *read_handle;
1100   GNUNET_STREAM_DataProcessor proc;
1101   void *proc_cls;
1102
1103   read_handle = socket->read_handle;
1104   GNUNET_assert (NULL != read_handle);
1105   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1106   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1107     return;
1108   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1109   {
1110     LOG (GNUNET_ERROR_TYPE_DEBUG,
1111          "%s: Read task timedout - Cancelling it\n",
1112          GNUNET_i2s (&socket->other_peer));
1113     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1114     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1115   }
1116   proc = read_handle->proc;
1117   proc_cls = read_handle->proc_cls;
1118   GNUNET_free (read_handle);
1119   socket->read_handle = NULL;
1120   /* Call the read processor to signal timeout */
1121   proc (proc_cls,
1122         GNUNET_STREAM_TIMEOUT,
1123         NULL,
1124         0);
1125 }
1126
1127
1128 /**
1129  * Handler for DATA messages; Same for both client and server
1130  *
1131  * @param socket the socket through which the ack was received
1132  * @param tunnel connection to the other end
1133  * @param sender who sent the message
1134  * @param msg the data message
1135  * @param atsi performance data for the connection
1136  * @return GNUNET_OK to keep the connection open,
1137  *         GNUNET_SYSERR to close it (signal serious error)
1138  */
1139 static int
1140 handle_data (struct GNUNET_STREAM_Socket *socket,
1141              struct GNUNET_MESH_Tunnel *tunnel,
1142              const struct GNUNET_PeerIdentity *sender,
1143              const struct GNUNET_STREAM_DataMessage *msg,
1144              const struct GNUNET_ATS_Information*atsi)
1145 {
1146   const void *payload;
1147   struct GNUNET_TIME_Relative ack_deadline_rel;
1148   uint32_t bytes_needed;
1149   uint32_t relative_offset;
1150   uint32_t relative_sequence_number;
1151   uint16_t size;
1152
1153   size = htons (msg->header.header.size);
1154   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1155   {
1156     GNUNET_break_op (0);
1157     return GNUNET_SYSERR;
1158   }
1159   if (0 != memcmp (sender, &socket->other_peer,
1160                    sizeof (struct GNUNET_PeerIdentity)))
1161   {
1162     LOG (GNUNET_ERROR_TYPE_DEBUG,
1163          "%s: Received DATA from non-confirming peer\n",
1164          GNUNET_i2s (&socket->other_peer));
1165     return GNUNET_YES;
1166   }
1167   switch (socket->state)
1168   {
1169   case STATE_ESTABLISHED:
1170   case STATE_TRANSMIT_CLOSED:
1171   case STATE_TRANSMIT_CLOSE_WAIT:      
1172     /* check if the message's sequence number is in the range we are
1173        expecting */
1174     relative_sequence_number = 
1175       ntohl (msg->sequence_number) - socket->read_sequence_number;
1176     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1177     {
1178       LOG (GNUNET_ERROR_TYPE_DEBUG,
1179            "%s: Ignoring received message with sequence number %u\n",
1180            GNUNET_i2s (&socket->other_peer),
1181            ntohl (msg->sequence_number));
1182       /* Start ACK sending task if one is not already present */
1183       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1184       {
1185         socket->ack_task_id = 
1186           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1187                                         (msg->ack_deadline),
1188                                         &ack_task,
1189                                         socket);
1190       }
1191       return GNUNET_YES;
1192     }      
1193     /* Check if we have already seen this message */
1194     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1195                                             relative_sequence_number))
1196     {
1197       LOG (GNUNET_ERROR_TYPE_DEBUG,
1198            "%s: Ignoring already received message with sequence number %u\n",
1199            GNUNET_i2s (&socket->other_peer),
1200            ntohl (msg->sequence_number));
1201       /* Start ACK sending task if one is not already present */
1202       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1203       {
1204         socket->ack_task_id = 
1205           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1206                                         (msg->ack_deadline), &ack_task, socket);
1207       }
1208       return GNUNET_YES;
1209     }
1210     LOG (GNUNET_ERROR_TYPE_DEBUG,
1211          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1212          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1213          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1214     /* Check if we have to allocate the buffer */
1215     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1216     relative_offset = ntohl (msg->offset) - socket->read_offset;
1217     bytes_needed = relative_offset + size;
1218     if (bytes_needed > socket->receive_buffer_size)
1219     {
1220       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1221       {
1222         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1223                                                  bytes_needed);
1224         socket->receive_buffer_size = bytes_needed;
1225       }
1226       else
1227       {
1228         LOG (GNUNET_ERROR_TYPE_DEBUG,
1229              "%s: Cannot accommodate packet %d as buffer is full\n",
1230              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1231         return GNUNET_YES;
1232       }
1233     }
1234     /* Copy Data to buffer */
1235     payload = &msg[1];
1236     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1237     memcpy (socket->receive_buffer + relative_offset, payload, size);
1238     socket->receive_buffer_boundaries[relative_sequence_number] = 
1239         relative_offset + size;
1240     /* Modify the ACK bitmap */
1241     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1242                           GNUNET_YES);
1243     /* Start ACK sending task if one is not already present */
1244     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1245     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1246     {
1247       ack_deadline_rel = 
1248           GNUNET_TIME_relative_min (ack_deadline_rel,
1249                                     GNUNET_TIME_relative_multiply
1250                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1251       socket->ack_task_id = 
1252           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1253                                         (msg->ack_deadline), &ack_task, socket);
1254       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1255       socket->ack_time_deadline = ack_deadline_rel;
1256     }
1257     else
1258     {
1259       struct GNUNET_TIME_Relative ack_time_past;
1260       struct GNUNET_TIME_Relative ack_time_remaining;
1261       struct GNUNET_TIME_Relative ack_time_min;
1262       ack_time_past = 
1263           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1264       ack_time_remaining = GNUNET_TIME_relative_subtract
1265           (socket->ack_time_deadline, ack_time_past);
1266       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1267                                                ack_deadline_rel);
1268       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1269                       sizeof (struct GNUNET_TIME_Relative)))
1270       {
1271         ack_deadline_rel = ack_time_min;
1272         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1273         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1274                                                             &ack_task, socket);
1275         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1276         socket->ack_time_deadline = ack_deadline_rel;
1277       }
1278     }
1279     if ((NULL != socket->read_handle) /* A read handle is waiting */
1280         /* There is no current read task */
1281         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1282         /* We have the first packet */
1283         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1284     {
1285       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1286            GNUNET_i2s (&socket->other_peer));
1287       socket->read_handle->read_task_id =
1288           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1289     }
1290     break;
1291   default:
1292     LOG (GNUNET_ERROR_TYPE_DEBUG,
1293          "%s: Received data message when it cannot be handled\n",
1294          GNUNET_i2s (&socket->other_peer));
1295     break;
1296   }
1297   return GNUNET_YES;
1298 }
1299
1300
1301 /**
1302  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1303  *
1304  * @param cls the socket (set from GNUNET_MESH_connect)
1305  * @param tunnel connection to the other end
1306  * @param tunnel_ctx place to store local state associated with the tunnel
1307  * @param sender who sent the message
1308  * @param message the actual message
1309  * @param atsi performance data for the connection
1310  * @return GNUNET_OK to keep the connection open,
1311  *         GNUNET_SYSERR to close it (signal serious error)
1312  */
1313 static int
1314 client_handle_data (void *cls,
1315                     struct GNUNET_MESH_Tunnel *tunnel,
1316                     void **tunnel_ctx,
1317                     const struct GNUNET_PeerIdentity *sender,
1318                     const struct GNUNET_MessageHeader *message,
1319                     const struct GNUNET_ATS_Information*atsi)
1320 {
1321   struct GNUNET_STREAM_Socket *socket = cls;
1322
1323   return handle_data (socket, tunnel, sender, 
1324                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1325 }
1326
1327
1328 /**
1329  * Callback to set state to ESTABLISHED
1330  *
1331  * @param cls the closure NULL;
1332  * @param socket the socket to requiring state change
1333  */
1334 static void
1335 set_state_established (void *cls,
1336                        struct GNUNET_STREAM_Socket *socket)
1337 {
1338   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1339        "%s: Attaining ESTABLISHED state\n",
1340        GNUNET_i2s (&socket->other_peer));
1341   socket->write_offset = 0;
1342   socket->read_offset = 0;
1343   socket->state = STATE_ESTABLISHED;
1344   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1345                  socket->control_retransmission_task_id);
1346   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1347   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1348   if (NULL != socket->lsocket)
1349   {
1350     LOG (GNUNET_ERROR_TYPE_DEBUG,
1351          "%s: Calling listen callback\n",
1352          GNUNET_i2s (&socket->other_peer));
1353     if (GNUNET_SYSERR == 
1354         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1355                                     socket,
1356                                     &socket->other_peer))
1357     {
1358       socket->state = STATE_CLOSED;
1359       /* FIXME: We should close in a decent way (send RST) */
1360       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1361       GNUNET_free (socket);
1362     }
1363   }
1364   else
1365     socket->open_cb (socket->open_cls, socket);
1366 }
1367
1368
1369 /**
1370  * Callback to set state to HELLO_WAIT
1371  *
1372  * @param cls the closure from queue_message
1373  * @param socket the socket to requiring state change
1374  */
1375 static void
1376 set_state_hello_wait (void *cls,
1377                       struct GNUNET_STREAM_Socket *socket)
1378 {
1379   GNUNET_assert (STATE_INIT == socket->state);
1380   LOG (GNUNET_ERROR_TYPE_DEBUG,
1381        "%s: Attaining HELLO_WAIT state\n",
1382        GNUNET_i2s (&socket->other_peer));
1383   socket->state = STATE_HELLO_WAIT;
1384 }
1385
1386
1387 /**
1388  * Callback to set state to CLOSE_WAIT
1389  *
1390  * @param cls the closure from queue_message
1391  * @param socket the socket requiring state change
1392  */
1393 static void
1394 set_state_close_wait (void *cls,
1395                       struct GNUNET_STREAM_Socket *socket)
1396 {
1397   LOG (GNUNET_ERROR_TYPE_DEBUG,
1398        "%s: Attaing CLOSE_WAIT state\n",
1399        GNUNET_i2s (&socket->other_peer));
1400   socket->state = STATE_CLOSE_WAIT;
1401   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1402   socket->receive_buffer = NULL;
1403   socket->receive_buffer_size = 0;
1404 }
1405
1406
1407 /**
1408  * Callback to set state to RECEIVE_CLOSE_WAIT
1409  *
1410  * @param cls the closure from queue_message
1411  * @param socket the socket requiring state change
1412  */
1413 static void
1414 set_state_receive_close_wait (void *cls,
1415                               struct GNUNET_STREAM_Socket *socket)
1416 {
1417   LOG (GNUNET_ERROR_TYPE_DEBUG,
1418        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1419        GNUNET_i2s (&socket->other_peer));
1420   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1421   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1422   socket->receive_buffer = NULL;
1423   socket->receive_buffer_size = 0;
1424 }
1425
1426
1427 /**
1428  * Callback to set state to TRANSMIT_CLOSE_WAIT
1429  *
1430  * @param cls the closure from queue_message
1431  * @param socket the socket requiring state change
1432  */
1433 static void
1434 set_state_transmit_close_wait (void *cls,
1435                                struct GNUNET_STREAM_Socket *socket)
1436 {
1437   LOG (GNUNET_ERROR_TYPE_DEBUG,
1438        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1439        GNUNET_i2s (&socket->other_peer));
1440   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1441 }
1442
1443
1444 /**
1445  * Callback to set state to CLOSED
1446  *
1447  * @param cls the closure from queue_message
1448  * @param socket the socket requiring state change
1449  */
1450 static void
1451 set_state_closed (void *cls,
1452                   struct GNUNET_STREAM_Socket *socket)
1453 {
1454   socket->state = STATE_CLOSED;
1455 }
1456
1457
1458 /**
1459  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1460  *
1461  * @return the generate hello message
1462  */
1463 static struct GNUNET_STREAM_MessageHeader *
1464 generate_hello (void)
1465 {
1466   struct GNUNET_STREAM_MessageHeader *msg;
1467
1468   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1469   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1470   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1471   return msg;
1472 }
1473
1474
1475 /**
1476  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1477  * socket
1478  *
1479  * @param socket the socket for which this HelloAckMessage has to be generated
1480  * @param generate_seq GNUNET_YES to generate the write sequence number,
1481  *          GNUNET_NO to use the existing sequence number
1482  * @return the HelloAckMessage
1483  */
1484 static struct GNUNET_STREAM_HelloAckMessage *
1485 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1486                     int generate_seq)
1487 {
1488   struct GNUNET_STREAM_HelloAckMessage *msg;
1489
1490   if (GNUNET_YES == generate_seq)
1491   {
1492     if (GNUNET_YES == socket->testing_active)
1493       socket->write_sequence_number =
1494         socket->testing_set_write_sequence_number_value;
1495     else
1496       socket->write_sequence_number = 
1497         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1498     LOG_DEBUG ("%s: write sequence number %u\n",
1499                GNUNET_i2s (&socket->other_peer),
1500                (unsigned int) socket->write_sequence_number);
1501   }
1502   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1503   msg->header.header.size = 
1504     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1505   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1506   msg->sequence_number = htonl (socket->write_sequence_number);
1507   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1508   return msg;
1509 }
1510
1511
1512 /**
1513  * Task for retransmitting control messages if they aren't ACK'ed before a
1514  * deadline
1515  *
1516  * @param cls the socket
1517  * @param tc the Task context
1518  */
1519 static void
1520 control_retransmission_task (void *cls,
1521                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1522 {
1523   struct GNUNET_STREAM_Socket *socket = cls;
1524     
1525   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1526   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1527     return;
1528   LOG_DEBUG ("%s: Retransmitting a control message\n",
1529                  GNUNET_i2s (&socket->other_peer));
1530   switch (socket->state)
1531   {
1532   case STATE_INIT:    
1533     GNUNET_break (0);
1534     break;
1535   case STATE_LISTEN:
1536     GNUNET_break (0);
1537     break;
1538   case STATE_HELLO_WAIT:
1539     if (NULL == socket->lsocket) /* We are client */
1540       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1541     else
1542       queue_message (socket,
1543                      (struct GNUNET_STREAM_MessageHeader *)
1544                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1545                      GNUNET_NO);
1546     socket->control_retransmission_task_id =
1547     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1548                                   &control_retransmission_task, socket);
1549     break;
1550   case STATE_ESTABLISHED:
1551     if (NULL == socket->lsocket)
1552       queue_message (socket,
1553                      (struct GNUNET_STREAM_MessageHeader *)
1554                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1555                      GNUNET_NO);
1556     else
1557       GNUNET_break (0);
1558     break;
1559   default:
1560     GNUNET_break (0);
1561   }  
1562 }
1563
1564
1565 /**
1566  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1567  *
1568  * @param cls the socket (set from GNUNET_MESH_connect)
1569  * @param tunnel connection to the other end
1570  * @param tunnel_ctx this is NULL
1571  * @param sender who sent the message
1572  * @param message the actual message
1573  * @param atsi performance data for the connection
1574  * @return GNUNET_OK to keep the connection open,
1575  *         GNUNET_SYSERR to close it (signal serious error)
1576  */
1577 static int
1578 client_handle_hello_ack (void *cls,
1579                          struct GNUNET_MESH_Tunnel *tunnel,
1580                          void **tunnel_ctx,
1581                          const struct GNUNET_PeerIdentity *sender,
1582                          const struct GNUNET_MessageHeader *message,
1583                          const struct GNUNET_ATS_Information*atsi)
1584 {
1585   struct GNUNET_STREAM_Socket *socket = cls;
1586   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1587   struct GNUNET_STREAM_HelloAckMessage *reply;
1588
1589   if (0 != memcmp (sender, &socket->other_peer,
1590                    sizeof (struct GNUNET_PeerIdentity)))
1591   {
1592     LOG (GNUNET_ERROR_TYPE_DEBUG,
1593          "%s: Received HELLO_ACK from non-confirming peer\n",
1594          GNUNET_i2s (&socket->other_peer));
1595     return GNUNET_YES;
1596   }
1597   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1598   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1599        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1600   GNUNET_assert (socket->tunnel == tunnel);
1601   switch (socket->state)
1602   {
1603   case STATE_HELLO_WAIT:
1604     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1605     LOG (GNUNET_ERROR_TYPE_DEBUG,
1606          "%s: Read sequence number %u\n",
1607          GNUNET_i2s (&socket->other_peer),
1608          (unsigned int) socket->read_sequence_number);
1609     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1610     reply = generate_hello_ack (socket, GNUNET_YES);
1611     queue_message (socket, &reply->header, &set_state_established,
1612                    NULL, GNUNET_NO);    
1613     return GNUNET_OK;
1614   case STATE_ESTABLISHED:
1615     // call statistics (# ACKs ignored++)
1616     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1617                    socket->control_retransmission_task_id);
1618     socket->control_retransmission_task_id =
1619       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1620     return GNUNET_OK;
1621   default:
1622     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1623                GNUNET_i2s (&socket->other_peer),
1624                GNUNET_i2s (&socket->other_peer), socket->state);
1625     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1626     return GNUNET_SYSERR;
1627   }
1628 }
1629
1630
1631 /**
1632  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1633  *
1634  * @param cls the socket (set from GNUNET_MESH_connect)
1635  * @param tunnel connection to the other end
1636  * @param tunnel_ctx this is NULL
1637  * @param sender who sent the message
1638  * @param message the actual message
1639  * @param atsi performance data for the connection
1640  * @return GNUNET_OK to keep the connection open,
1641  *         GNUNET_SYSERR to close it (signal serious error)
1642  */
1643 static int
1644 client_handle_reset (void *cls,
1645                      struct GNUNET_MESH_Tunnel *tunnel,
1646                      void **tunnel_ctx,
1647                      const struct GNUNET_PeerIdentity *sender,
1648                      const struct GNUNET_MessageHeader *message,
1649                      const struct GNUNET_ATS_Information*atsi)
1650 {
1651   // struct GNUNET_STREAM_Socket *socket = cls;
1652
1653   return GNUNET_OK;
1654 }
1655
1656
1657 /**
1658  * Frees the socket's receive buffers, marks the socket as receive closed and
1659  * calls the DataProcessor with GNUNET_STREAM_SHUTDOWN status if a read handle
1660  * is present
1661  *
1662  * @param socket the socket
1663  */
1664 static void
1665 do_receive_shutdown (struct GNUNET_STREAM_Socket *socket)
1666 {
1667   socket->receive_closed = GNUNET_YES;  
1668   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1669   socket->receive_buffer = NULL;
1670   socket->receive_buffer_size = 0;
1671   if (NULL != socket->read_handle)
1672   {
1673     GNUNET_STREAM_DataProcessor proc;
1674     void *proc_cls;
1675
1676     proc = socket->read_handle->proc;
1677     proc_cls = socket->read_handle->proc_cls;
1678     GNUNET_STREAM_read_cancel (socket->read_handle);
1679     socket->read_handle = NULL;
1680     if (NULL != proc)
1681       proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
1682   }
1683 }
1684
1685
1686 /**
1687  * Marks the socket as transmit closed and calls the CompletionContinuation with
1688  * GNUNET_STREAM_SHUTDOWN status if a write handle is present
1689  *
1690  * @param socket the socket
1691  */
1692 static void
1693 do_transmit_shutdown (struct GNUNET_STREAM_Socket *socket)
1694 {
1695   socket->transmit_closed = GNUNET_YES;
1696   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1697      that that stream has been shutdown */
1698   if (NULL != socket->write_handle)
1699   {
1700     GNUNET_STREAM_CompletionContinuation wc;
1701     void *wc_cls;
1702
1703     wc = socket->write_handle->write_cont;
1704     wc_cls = socket->write_handle->write_cont_cls;
1705     GNUNET_STREAM_write_cancel (socket->write_handle);
1706     socket->write_handle = NULL;
1707     if (NULL != wc)
1708       wc (wc_cls,
1709           GNUNET_STREAM_SHUTDOWN, 0);
1710   }
1711 }
1712
1713
1714 /**
1715  * Common message handler for handling TRANSMIT_CLOSE messages
1716  *
1717  * @param socket the socket through which the ack was received
1718  * @param tunnel connection to the other end
1719  * @param sender who sent the message
1720  * @param msg the transmit close message
1721  * @param atsi performance data for the connection
1722  * @return GNUNET_OK to keep the connection open,
1723  *         GNUNET_SYSERR to close it (signal serious error)
1724  */
1725 static int
1726 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1727                        struct GNUNET_MESH_Tunnel *tunnel,
1728                        const struct GNUNET_PeerIdentity *sender,
1729                        const struct GNUNET_STREAM_MessageHeader *msg,
1730                        const struct GNUNET_ATS_Information*atsi)
1731 {
1732   struct GNUNET_STREAM_MessageHeader *reply;
1733
1734   switch (socket->state)
1735   {
1736   case STATE_INIT:
1737   case STATE_LISTEN:
1738   case STATE_HELLO_WAIT:
1739     LOG (GNUNET_ERROR_TYPE_DEBUG,
1740          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1741          GNUNET_i2s (&socket->other_peer));
1742     return GNUNET_OK;
1743   default:
1744     break;
1745   }
1746   /* Send TRANSMIT_CLOSE_ACK */
1747   reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1748   reply->header.type = 
1749       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1750   reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1751   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1752   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1753        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1754   switch(socket->state)
1755   {
1756   case STATE_RECEIVE_CLOSED:
1757   case STATE_RECEIVE_CLOSE_WAIT:
1758   case STATE_CLOSE_WAIT:
1759   case STATE_CLOSED:
1760     return GNUNET_OK;
1761   default:
1762     break;
1763   }
1764   do_receive_shutdown (socket);
1765   if (GNUNET_YES == socket->transmit_closed)
1766     socket->state = STATE_CLOSED;
1767   else
1768     socket->state = STATE_RECEIVE_CLOSED;
1769   return GNUNET_OK;
1770 }
1771
1772
1773 /**
1774  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1775  *
1776  * @param cls the socket (set from GNUNET_MESH_connect)
1777  * @param tunnel connection to the other end
1778  * @param tunnel_ctx this is NULL
1779  * @param sender who sent the message
1780  * @param message the actual message
1781  * @param atsi performance data for the connection
1782  * @return GNUNET_OK to keep the connection open,
1783  *         GNUNET_SYSERR to close it (signal serious error)
1784  */
1785 static int
1786 client_handle_transmit_close (void *cls,
1787                               struct GNUNET_MESH_Tunnel *tunnel,
1788                               void **tunnel_ctx,
1789                               const struct GNUNET_PeerIdentity *sender,
1790                               const struct GNUNET_MessageHeader *message,
1791                               const struct GNUNET_ATS_Information*atsi)
1792 {
1793   struct GNUNET_STREAM_Socket *socket = cls;
1794   
1795   return handle_transmit_close (socket,
1796                                 tunnel,
1797                                 sender,
1798                                 (struct GNUNET_STREAM_MessageHeader *)message,
1799                                 atsi);
1800 }
1801
1802
1803 /**
1804  * Task for calling the shutdown continuation callback
1805  *
1806  * @param cls the socket
1807  * @param tc the scheduler task context
1808  */
1809 static void
1810 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1811 {
1812   struct GNUNET_STREAM_Socket *socket = cls;
1813   
1814   GNUNET_assert (NULL != socket->shutdown_handle);
1815   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1816   if (NULL != socket->shutdown_handle->completion_cb)
1817     socket->shutdown_handle->completion_cb
1818         (socket->shutdown_handle->completion_cls,
1819          socket->shutdown_handle->operation);
1820   GNUNET_free (socket->shutdown_handle);
1821   socket->shutdown_handle = NULL;
1822 }
1823
1824
1825 /**
1826  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1827  *
1828  * @param socket the socket
1829  * @param tunnel connection to the other end
1830  * @param sender who sent the message
1831  * @param message the actual message
1832  * @param atsi performance data for the connection
1833  * @param operation the close operation which is being ACK'ed
1834  * @return GNUNET_OK to keep the connection open,
1835  *         GNUNET_SYSERR to close it (signal serious error)
1836  */
1837 static int
1838 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1839                           struct GNUNET_MESH_Tunnel *tunnel,
1840                           const struct GNUNET_PeerIdentity *sender,
1841                           const struct GNUNET_STREAM_MessageHeader *message,
1842                           const struct GNUNET_ATS_Information *atsi,
1843                           int operation)
1844 {
1845   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1846
1847   shutdown_handle = socket->shutdown_handle;
1848   if (NULL == shutdown_handle)
1849   {
1850     /* This may happen when the shudown handle is cancelled */
1851     LOG (GNUNET_ERROR_TYPE_DEBUG,
1852          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1853          GNUNET_i2s (&socket->other_peer));
1854     return GNUNET_OK;
1855   }
1856   switch (operation)
1857   {
1858   case SHUT_RDWR:
1859     switch (socket->state)
1860     {
1861     case STATE_CLOSE_WAIT:
1862       if (SHUT_RDWR != shutdown_handle->operation)
1863       {
1864         LOG (GNUNET_ERROR_TYPE_DEBUG,
1865              "%s: Received CLOSE_ACK when shutdown handle is not for "
1866              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1867         return GNUNET_OK;
1868       }
1869       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1870            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1871       socket->state = STATE_CLOSED;
1872       break;
1873     default:
1874       LOG (GNUNET_ERROR_TYPE_DEBUG,
1875            "%s: Received CLOSE_ACK when it is not expected\n",
1876            GNUNET_i2s (&socket->other_peer));
1877       return GNUNET_OK;
1878     }
1879     break;
1880   case SHUT_RD:
1881     switch (socket->state)
1882     {
1883     case STATE_RECEIVE_CLOSE_WAIT:
1884       if (SHUT_RD != shutdown_handle->operation)
1885       {
1886         LOG (GNUNET_ERROR_TYPE_DEBUG,
1887              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1888              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1889         return GNUNET_OK;
1890       }
1891       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1892            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1893       socket->state = STATE_RECEIVE_CLOSED;
1894       break;
1895     default:
1896       LOG (GNUNET_ERROR_TYPE_DEBUG,
1897            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1898            GNUNET_i2s (&socket->other_peer));
1899       return GNUNET_OK;
1900     }
1901     break;
1902   case SHUT_WR:
1903     switch (socket->state)
1904     {
1905     case STATE_TRANSMIT_CLOSE_WAIT:
1906       if (SHUT_WR != shutdown_handle->operation)
1907       {
1908         LOG (GNUNET_ERROR_TYPE_DEBUG,
1909              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1910              "is not for SHUT_WR\n",
1911              GNUNET_i2s (&socket->other_peer));
1912         return GNUNET_OK;
1913       }
1914       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1915            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1916       socket->state = STATE_TRANSMIT_CLOSED;
1917       break;
1918     default:
1919       LOG (GNUNET_ERROR_TYPE_DEBUG,
1920            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1921            GNUNET_i2s (&socket->other_peer));          
1922       return GNUNET_OK;
1923     }
1924     break;
1925   default:
1926     GNUNET_assert (0);
1927   }
1928   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1929       (&call_cont_task, socket);
1930   if (GNUNET_SCHEDULER_NO_TASK
1931       != shutdown_handle->close_msg_retransmission_task_id)
1932   {
1933     GNUNET_SCHEDULER_cancel
1934       (shutdown_handle->close_msg_retransmission_task_id);
1935     shutdown_handle->close_msg_retransmission_task_id =
1936         GNUNET_SCHEDULER_NO_TASK;
1937   }
1938   return GNUNET_OK;
1939 }
1940
1941
1942 /**
1943  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1944  *
1945  * @param cls the socket (set from GNUNET_MESH_connect)
1946  * @param tunnel connection to the other end
1947  * @param tunnel_ctx this is NULL
1948  * @param sender who sent the message
1949  * @param message the actual message
1950  * @param atsi performance data for the connection
1951  * @return GNUNET_OK to keep the connection open,
1952  *         GNUNET_SYSERR to close it (signal serious error)
1953  */
1954 static int
1955 client_handle_transmit_close_ack (void *cls,
1956                                   struct GNUNET_MESH_Tunnel *tunnel,
1957                                   void **tunnel_ctx,
1958                                   const struct GNUNET_PeerIdentity *sender,
1959                                   const struct GNUNET_MessageHeader *message,
1960                                   const struct GNUNET_ATS_Information*atsi)
1961 {
1962   struct GNUNET_STREAM_Socket *socket = cls;
1963
1964   return handle_generic_close_ack (socket,
1965                                    tunnel,
1966                                    sender,
1967                                    (const struct GNUNET_STREAM_MessageHeader *)
1968                                    message,
1969                                    atsi,
1970                                    SHUT_WR);
1971 }
1972
1973
1974 /**
1975  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1976  *
1977  * @param socket the socket
1978  * @param tunnel connection to the other end
1979  * @param sender who sent the message
1980  * @param message the actual message
1981  * @param atsi performance data for the connection
1982  * @return GNUNET_OK to keep the connection open,
1983  *         GNUNET_SYSERR to close it (signal serious error)
1984  */
1985 static int
1986 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1987                       struct GNUNET_MESH_Tunnel *tunnel,
1988                       const struct GNUNET_PeerIdentity *sender,
1989                       const struct GNUNET_STREAM_MessageHeader *message,
1990                       const struct GNUNET_ATS_Information *atsi)
1991 {
1992   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1993
1994   switch (socket->state)
1995   {
1996   case STATE_INIT:
1997   case STATE_LISTEN:
1998   case STATE_HELLO_WAIT:
1999     LOG (GNUNET_ERROR_TYPE_DEBUG,
2000          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2001          GNUNET_i2s (&socket->other_peer));
2002     return GNUNET_OK;
2003   default:
2004     break;
2005   }  
2006   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
2007        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));  
2008   receive_close_ack =
2009     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2010   receive_close_ack->header.size =
2011     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2012   receive_close_ack->header.type =
2013     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
2014   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
2015   switch (socket->state)
2016   {
2017   case STATE_TRANSMIT_CLOSED:
2018   case STATE_TRANSMIT_CLOSE_WAIT:
2019   case STATE_CLOSED:
2020   case STATE_CLOSE_WAIT:
2021     return GNUNET_OK;
2022   default:
2023     break;
2024   }
2025   do_transmit_shutdown (socket);
2026   if (GNUNET_YES == socket->receive_closed)
2027     socket->state = STATE_CLOSED;
2028   else
2029     socket->state = STATE_TRANSMIT_CLOSED;
2030   return GNUNET_OK;
2031 }
2032
2033
2034 /**
2035  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2036  *
2037  * @param cls the socket (set from GNUNET_MESH_connect)
2038  * @param tunnel connection to the other end
2039  * @param tunnel_ctx this is NULL
2040  * @param sender who sent the message
2041  * @param message the actual message
2042  * @param atsi performance data for the connection
2043  * @return GNUNET_OK to keep the connection open,
2044  *         GNUNET_SYSERR to close it (signal serious error)
2045  */
2046 static int
2047 client_handle_receive_close (void *cls,
2048                              struct GNUNET_MESH_Tunnel *tunnel,
2049                              void **tunnel_ctx,
2050                              const struct GNUNET_PeerIdentity *sender,
2051                              const struct GNUNET_MessageHeader *message,
2052                              const struct GNUNET_ATS_Information*atsi)
2053 {
2054   struct GNUNET_STREAM_Socket *socket = cls;
2055
2056   return
2057     handle_receive_close (socket,
2058                           tunnel,
2059                           sender,
2060                           (const struct GNUNET_STREAM_MessageHeader *) message,
2061                           atsi);
2062 }
2063
2064
2065 /**
2066  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2067  *
2068  * @param cls the socket (set from GNUNET_MESH_connect)
2069  * @param tunnel connection to the other end
2070  * @param tunnel_ctx this is NULL
2071  * @param sender who sent the message
2072  * @param message the actual message
2073  * @param atsi performance data for the connection
2074  * @return GNUNET_OK to keep the connection open,
2075  *         GNUNET_SYSERR to close it (signal serious error)
2076  */
2077 static int
2078 client_handle_receive_close_ack (void *cls,
2079                                  struct GNUNET_MESH_Tunnel *tunnel,
2080                                  void **tunnel_ctx,
2081                                  const struct GNUNET_PeerIdentity *sender,
2082                                  const struct GNUNET_MessageHeader *message,
2083                                  const struct GNUNET_ATS_Information*atsi)
2084 {
2085   struct GNUNET_STREAM_Socket *socket = cls;
2086
2087   return handle_generic_close_ack (socket,
2088                                    tunnel,
2089                                    sender,
2090                                    (const struct GNUNET_STREAM_MessageHeader *)
2091                                    message,
2092                                    atsi,
2093                                    SHUT_RD);
2094 }
2095
2096
2097 /**
2098  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2099  *
2100  * @param socket the socket
2101  * @param tunnel connection to the other end
2102  * @param sender who sent the message
2103  * @param message the actual message
2104  * @param atsi performance data for the connection
2105  * @return GNUNET_OK to keep the connection open,
2106  *         GNUNET_SYSERR to close it (signal serious error)
2107  */
2108 static int
2109 handle_close (struct GNUNET_STREAM_Socket *socket,
2110               struct GNUNET_MESH_Tunnel *tunnel,
2111               const struct GNUNET_PeerIdentity *sender,
2112               const struct GNUNET_STREAM_MessageHeader *message,
2113               const struct GNUNET_ATS_Information*atsi)
2114 {
2115   struct GNUNET_STREAM_MessageHeader *close_ack;
2116
2117   switch (socket->state)
2118   {
2119   case STATE_INIT:
2120   case STATE_LISTEN:
2121   case STATE_HELLO_WAIT:
2122     LOG (GNUNET_ERROR_TYPE_DEBUG,
2123          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2124          GNUNET_i2s (&socket->other_peer));
2125     return GNUNET_OK;
2126   default:
2127     break;
2128   }
2129   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2130        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2131   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2132   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2133   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2134   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2135   if ((STATE_CLOSED == socket->state) || (STATE_CLOSE_WAIT == socket->state))
2136     return GNUNET_OK;
2137   if (GNUNET_NO == socket->transmit_closed)
2138     do_transmit_shutdown (socket);
2139   if (GNUNET_NO == socket->receive_closed)
2140     do_receive_shutdown (socket);
2141   return GNUNET_OK;
2142 }
2143
2144
2145 /**
2146  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2147  *
2148  * @param cls the socket (set from GNUNET_MESH_connect)
2149  * @param tunnel connection to the other end
2150  * @param tunnel_ctx this is NULL
2151  * @param sender who sent the message
2152  * @param message the actual message
2153  * @param atsi performance data for the connection
2154  * @return GNUNET_OK to keep the connection open,
2155  *         GNUNET_SYSERR to close it (signal serious error)
2156  */
2157 static int
2158 client_handle_close (void *cls,
2159                      struct GNUNET_MESH_Tunnel *tunnel,
2160                      void **tunnel_ctx,
2161                      const struct GNUNET_PeerIdentity *sender,
2162                      const struct GNUNET_MessageHeader *message,
2163                      const struct GNUNET_ATS_Information*atsi)
2164 {
2165   struct GNUNET_STREAM_Socket *socket = cls;
2166
2167   return handle_close (socket,
2168                        tunnel,
2169                        sender,
2170                        (const struct GNUNET_STREAM_MessageHeader *) message,
2171                        atsi);
2172 }
2173
2174
2175 /**
2176  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2177  *
2178  * @param cls the socket (set from GNUNET_MESH_connect)
2179  * @param tunnel connection to the other end
2180  * @param tunnel_ctx this is NULL
2181  * @param sender who sent the message
2182  * @param message the actual message
2183  * @param atsi performance data for the connection
2184  * @return GNUNET_OK to keep the connection open,
2185  *         GNUNET_SYSERR to close it (signal serious error)
2186  */
2187 static int
2188 client_handle_close_ack (void *cls,
2189                          struct GNUNET_MESH_Tunnel *tunnel,
2190                          void **tunnel_ctx,
2191                          const struct GNUNET_PeerIdentity *sender,
2192                          const struct GNUNET_MessageHeader *message,
2193                          const struct GNUNET_ATS_Information *atsi)
2194 {
2195   struct GNUNET_STREAM_Socket *socket = cls;
2196
2197   return handle_generic_close_ack (socket,
2198                                    tunnel,
2199                                    sender,
2200                                    (const struct GNUNET_STREAM_MessageHeader *) 
2201                                    message,
2202                                    atsi,
2203                                    SHUT_RDWR);
2204 }
2205
2206 /*****************************/
2207 /* Server's Message Handlers */
2208 /*****************************/
2209
2210 /**
2211  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2212  *
2213  * @param cls the closure
2214  * @param tunnel connection to the other end
2215  * @param tunnel_ctx the socket
2216  * @param sender who sent the message
2217  * @param message the actual message
2218  * @param atsi performance data for the connection
2219  * @return GNUNET_OK to keep the connection open,
2220  *         GNUNET_SYSERR to close it (signal serious error)
2221  */
2222 static int
2223 server_handle_data (void *cls,
2224                     struct GNUNET_MESH_Tunnel *tunnel,
2225                     void **tunnel_ctx,
2226                     const struct GNUNET_PeerIdentity *sender,
2227                     const struct GNUNET_MessageHeader *message,
2228                     const struct GNUNET_ATS_Information*atsi)
2229 {
2230   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2231
2232   return handle_data (socket,
2233                       tunnel,
2234                       sender,
2235                       (const struct GNUNET_STREAM_DataMessage *)message,
2236                       atsi);
2237 }
2238
2239
2240 /**
2241  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2242  *
2243  * @param cls the closure
2244  * @param tunnel connection to the other end
2245  * @param tunnel_ctx the socket
2246  * @param sender who sent the message
2247  * @param message the actual message
2248  * @param atsi performance data for the connection
2249  * @return GNUNET_OK to keep the connection open,
2250  *         GNUNET_SYSERR to close it (signal serious error)
2251  */
2252 static int
2253 server_handle_hello (void *cls,
2254                      struct GNUNET_MESH_Tunnel *tunnel,
2255                      void **tunnel_ctx,
2256                      const struct GNUNET_PeerIdentity *sender,
2257                      const struct GNUNET_MessageHeader *message,
2258                      const struct GNUNET_ATS_Information*atsi)
2259 {
2260   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2261   struct GNUNET_STREAM_HelloAckMessage *reply;
2262
2263   if (0 != memcmp (sender,
2264                    &socket->other_peer,
2265                    sizeof (struct GNUNET_PeerIdentity)))
2266   {
2267     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2268                GNUNET_i2s (&socket->other_peer));
2269     return GNUNET_YES;
2270   }
2271   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2272   GNUNET_assert (socket->tunnel == tunnel);
2273   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2274              GNUNET_i2s (&socket->other_peer));
2275   switch (socket->state)
2276   {
2277   case STATE_INIT:
2278     reply = generate_hello_ack (socket, GNUNET_YES);
2279     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2280                    GNUNET_NO);
2281     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2282                    socket->control_retransmission_task_id);
2283     socket->control_retransmission_task_id =
2284       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2285                                     &control_retransmission_task, socket);
2286     break;
2287   case STATE_HELLO_WAIT:
2288     /* Perhaps our HELLO_ACK was lost */
2289     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2290                    socket->control_retransmission_task_id);
2291     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2292     socket->control_retransmission_task_id =
2293       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2294     break;
2295   default:
2296     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2297                GNUNET_i2s (&socket->other_peer), socket->state);
2298     /* FIXME: Send RESET? */
2299   }
2300   return GNUNET_OK;
2301 }
2302
2303
2304 /**
2305  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2306  *
2307  * @param cls the closure
2308  * @param tunnel connection to the other end
2309  * @param tunnel_ctx the socket
2310  * @param sender who sent the message
2311  * @param message the actual message
2312  * @param atsi performance data for the connection
2313  * @return GNUNET_OK to keep the connection open,
2314  *         GNUNET_SYSERR to close it (signal serious error)
2315  */
2316 static int
2317 server_handle_hello_ack (void *cls,
2318                          struct GNUNET_MESH_Tunnel *tunnel,
2319                          void **tunnel_ctx,
2320                          const struct GNUNET_PeerIdentity *sender,
2321                          const struct GNUNET_MessageHeader *message,
2322                          const struct GNUNET_ATS_Information*atsi)
2323 {
2324   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2325   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2326
2327   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2328                  ntohs (message->type));
2329   GNUNET_assert (socket->tunnel == tunnel);
2330   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2331   switch (socket->state)  
2332   {
2333   case STATE_HELLO_WAIT:
2334     LOG (GNUNET_ERROR_TYPE_DEBUG,
2335          "%s: Received HELLO_ACK from %s\n",
2336          GNUNET_i2s (&socket->other_peer),
2337          GNUNET_i2s (&socket->other_peer));
2338     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2339     LOG (GNUNET_ERROR_TYPE_DEBUG,
2340          "%s: Read sequence number %u\n",
2341          GNUNET_i2s (&socket->other_peer),
2342          (unsigned int) socket->read_sequence_number);
2343     socket->receiver_window_available = 
2344       ntohl (ack_message->receiver_window_size);
2345     set_state_established (NULL, socket);
2346     break;
2347   default:
2348     LOG (GNUNET_ERROR_TYPE_DEBUG,
2349          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2350   }
2351   return GNUNET_OK;
2352 }
2353
2354
2355 /**
2356  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2357  *
2358  * @param cls the closure
2359  * @param tunnel connection to the other end
2360  * @param tunnel_ctx the socket
2361  * @param sender who sent the message
2362  * @param message the actual message
2363  * @param atsi performance data for the connection
2364  * @return GNUNET_OK to keep the connection open,
2365  *         GNUNET_SYSERR to close it (signal serious error)
2366  */
2367 static int
2368 server_handle_reset (void *cls,
2369                      struct GNUNET_MESH_Tunnel *tunnel,
2370                      void **tunnel_ctx,
2371                      const struct GNUNET_PeerIdentity *sender,
2372                      const struct GNUNET_MessageHeader *message,
2373                      const struct GNUNET_ATS_Information*atsi)
2374 {
2375   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2376
2377   return GNUNET_OK;
2378 }
2379
2380
2381 /**
2382  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2383  *
2384  * @param cls the closure
2385  * @param tunnel connection to the other end
2386  * @param tunnel_ctx the socket
2387  * @param sender who sent the message
2388  * @param message the actual message
2389  * @param atsi performance data for the connection
2390  * @return GNUNET_OK to keep the connection open,
2391  *         GNUNET_SYSERR to close it (signal serious error)
2392  */
2393 static int
2394 server_handle_transmit_close (void *cls,
2395                               struct GNUNET_MESH_Tunnel *tunnel,
2396                               void **tunnel_ctx,
2397                               const struct GNUNET_PeerIdentity *sender,
2398                               const struct GNUNET_MessageHeader *message,
2399                               const struct GNUNET_ATS_Information*atsi)
2400 {
2401   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2402
2403   return handle_transmit_close (socket,
2404                                 tunnel,
2405                                 sender,
2406                                 (struct GNUNET_STREAM_MessageHeader *)message,
2407                                 atsi);
2408 }
2409
2410
2411 /**
2412  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2413  *
2414  * @param cls the closure
2415  * @param tunnel connection to the other end
2416  * @param tunnel_ctx the socket
2417  * @param sender who sent the message
2418  * @param message the actual message
2419  * @param atsi performance data for the connection
2420  * @return GNUNET_OK to keep the connection open,
2421  *         GNUNET_SYSERR to close it (signal serious error)
2422  */
2423 static int
2424 server_handle_transmit_close_ack (void *cls,
2425                                   struct GNUNET_MESH_Tunnel *tunnel,
2426                                   void **tunnel_ctx,
2427                                   const struct GNUNET_PeerIdentity *sender,
2428                                   const struct GNUNET_MessageHeader *message,
2429                                   const struct GNUNET_ATS_Information*atsi)
2430 {
2431   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2432
2433   return handle_generic_close_ack (socket,
2434                                    tunnel,
2435                                    sender,
2436                                    (const struct GNUNET_STREAM_MessageHeader *)
2437                                    message,
2438                                    atsi,
2439                                    SHUT_WR);
2440 }
2441
2442
2443 /**
2444  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2445  *
2446  * @param cls the closure
2447  * @param tunnel connection to the other end
2448  * @param tunnel_ctx the socket
2449  * @param sender who sent the message
2450  * @param message the actual message
2451  * @param atsi performance data for the connection
2452  * @return GNUNET_OK to keep the connection open,
2453  *         GNUNET_SYSERR to close it (signal serious error)
2454  */
2455 static int
2456 server_handle_receive_close (void *cls,
2457                              struct GNUNET_MESH_Tunnel *tunnel,
2458                              void **tunnel_ctx,
2459                              const struct GNUNET_PeerIdentity *sender,
2460                              const struct GNUNET_MessageHeader *message,
2461                              const struct GNUNET_ATS_Information*atsi)
2462 {
2463   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2464
2465   return
2466     handle_receive_close (socket,
2467                           tunnel,
2468                           sender,
2469                           (const struct GNUNET_STREAM_MessageHeader *) message,
2470                           atsi);
2471 }
2472
2473
2474 /**
2475  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2476  *
2477  * @param cls the closure
2478  * @param tunnel connection to the other end
2479  * @param tunnel_ctx the socket
2480  * @param sender who sent the message
2481  * @param message the actual message
2482  * @param atsi performance data for the connection
2483  * @return GNUNET_OK to keep the connection open,
2484  *         GNUNET_SYSERR to close it (signal serious error)
2485  */
2486 static int
2487 server_handle_receive_close_ack (void *cls,
2488                                  struct GNUNET_MESH_Tunnel *tunnel,
2489                                  void **tunnel_ctx,
2490                                  const struct GNUNET_PeerIdentity *sender,
2491                                  const struct GNUNET_MessageHeader *message,
2492                                  const struct GNUNET_ATS_Information*atsi)
2493 {
2494   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2495
2496   return handle_generic_close_ack (socket,
2497                                    tunnel,
2498                                    sender,
2499                                    (const struct GNUNET_STREAM_MessageHeader *)
2500                                    message,
2501                                    atsi,
2502                                    SHUT_RD);
2503 }
2504
2505
2506 /**
2507  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2508  *
2509  * @param cls the listen socket (from GNUNET_MESH_connect in
2510  *          GNUNET_STREAM_listen) 
2511  * @param tunnel connection to the other end
2512  * @param tunnel_ctx the socket
2513  * @param sender who sent the message
2514  * @param message the actual message
2515  * @param atsi performance data for the connection
2516  * @return GNUNET_OK to keep the connection open,
2517  *         GNUNET_SYSERR to close it (signal serious error)
2518  */
2519 static int
2520 server_handle_close (void *cls,
2521                      struct GNUNET_MESH_Tunnel *tunnel,
2522                      void **tunnel_ctx,
2523                      const struct GNUNET_PeerIdentity *sender,
2524                      const struct GNUNET_MessageHeader *message,
2525                      const struct GNUNET_ATS_Information*atsi)
2526 {
2527   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2528   
2529   return handle_close (socket,
2530                        tunnel,
2531                        sender,
2532                        (const struct GNUNET_STREAM_MessageHeader *) message,
2533                        atsi);
2534 }
2535
2536
2537 /**
2538  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2539  *
2540  * @param cls the closure
2541  * @param tunnel connection to the other end
2542  * @param tunnel_ctx the socket
2543  * @param sender who sent the message
2544  * @param message the actual message
2545  * @param atsi performance data for the connection
2546  * @return GNUNET_OK to keep the connection open,
2547  *         GNUNET_SYSERR to close it (signal serious error)
2548  */
2549 static int
2550 server_handle_close_ack (void *cls,
2551                          struct GNUNET_MESH_Tunnel *tunnel,
2552                          void **tunnel_ctx,
2553                          const struct GNUNET_PeerIdentity *sender,
2554                          const struct GNUNET_MessageHeader *message,
2555                          const struct GNUNET_ATS_Information*atsi)
2556 {
2557   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2558
2559   return handle_generic_close_ack (socket,
2560                                    tunnel,
2561                                    sender,
2562                                    (const struct GNUNET_STREAM_MessageHeader *) 
2563                                    message,
2564                                    atsi,
2565                                    SHUT_RDWR);
2566 }
2567
2568
2569 /**
2570  * Handler for DATA_ACK messages
2571  *
2572  * @param socket the socket through which the ack was received
2573  * @param tunnel connection to the other end
2574  * @param sender who sent the message
2575  * @param ack the acknowledgment message
2576  * @param atsi performance data for the connection
2577  * @return GNUNET_OK to keep the connection open,
2578  *         GNUNET_SYSERR to close it (signal serious error)
2579  */
2580 static int
2581 handle_ack (struct GNUNET_STREAM_Socket *socket,
2582             struct GNUNET_MESH_Tunnel *tunnel,
2583             const struct GNUNET_PeerIdentity *sender,
2584             const struct GNUNET_STREAM_AckMessage *ack,
2585             const struct GNUNET_ATS_Information*atsi)
2586 {
2587   struct GNUNET_STREAM_WriteHandle *write_handle;
2588   uint64_t ack_bitmap;
2589   unsigned int packet;
2590   int need_retransmission;
2591   uint32_t sequence_difference;
2592
2593   if (0 != memcmp (sender,
2594                    &socket->other_peer,
2595                    sizeof (struct GNUNET_PeerIdentity)))
2596   {
2597     LOG (GNUNET_ERROR_TYPE_DEBUG,
2598          "%s: Received ACK from non-confirming peer\n",
2599          GNUNET_i2s (&socket->other_peer));
2600     return GNUNET_YES;
2601   }
2602   switch (socket->state)
2603   {
2604   case (STATE_ESTABLISHED):
2605   case (STATE_RECEIVE_CLOSED):
2606   case (STATE_RECEIVE_CLOSE_WAIT):
2607     if (NULL == socket->write_handle)
2608     {
2609       LOG (GNUNET_ERROR_TYPE_DEBUG,
2610            "%s: Received DATA_ACK when write_handle is NULL\n",
2611            GNUNET_i2s (&socket->other_peer));
2612       return GNUNET_OK;
2613     }
2614     sequence_difference = 
2615         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2616     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2617     {
2618       LOG (GNUNET_ERROR_TYPE_DEBUG,
2619            "%s: Received DATA_ACK with unexpected base sequence number\n",
2620            GNUNET_i2s (&socket->other_peer));
2621       LOG (GNUNET_ERROR_TYPE_DEBUG,
2622            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2623            GNUNET_i2s (&socket->other_peer),
2624            socket->write_sequence_number,
2625            ntohl (ack->base_sequence_number));
2626       return GNUNET_OK;
2627     }
2628     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2629          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2630     /* Cancel the retransmission task */
2631     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2632     {
2633       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2634       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2635       socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
2636     }
2637     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2638     {
2639       if (NULL == socket->write_handle->messages[packet])
2640         break;
2641       /* BS: Base sequence from ack; PS: sequence num of current packet */
2642       sequence_difference = ntohl (ack->base_sequence_number)
2643         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2644       if (0 == sequence_difference)
2645         break; /* The message in our handle is not yet received */
2646       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2647       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2648       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2649                             packet, GNUNET_YES);
2650     }
2651     if (((ntohl (ack->base_sequence_number)
2652          - (socket->write_handle->max_ack_base_num))
2653           <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2654     {
2655       socket->write_handle->max_ack_base_num = ntohl (ack->base_sequence_number);
2656       socket->receiver_window_available = 
2657           ntohl (ack->receive_window_remaining);
2658     }
2659     else
2660       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2661                   "Ignoring to modify receive window available as base: %u, max_ack_base: %u\n",
2662                   ntohl (ack->base_sequence_number),
2663                    socket->write_handle->max_ack_base_num);
2664     if ((packet == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2665         || ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2666             && (NULL == socket->write_handle->messages[packet])))
2667       goto call_write_cont_cb;
2668     GNUNET_assert (ntohl
2669                    (socket->write_handle->messages[packet]->sequence_number)
2670                    == ntohl (ack->base_sequence_number));
2671     /* Update our bitmap */
2672     ack_bitmap = GNUNET_ntohll (ack->bitmap);
2673     for (; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2674     {
2675       if (NULL == socket->write_handle->messages[packet]) break;
2676       if (ackbitmap_is_bit_set (&ack_bitmap, ntohl
2677                                 (socket->write_handle->messages[packet]->sequence_number)
2678                                 - ntohl (ack->base_sequence_number)))
2679         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet, GNUNET_YES);
2680     }
2681     /* Check if we have received all acknowledgements */
2682     need_retransmission = GNUNET_NO;
2683     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2684     {
2685       if (NULL == socket->write_handle->messages[packet]) break;
2686       if (GNUNET_YES != ackbitmap_is_bit_set 
2687           (&socket->write_handle->ack_bitmap,packet))
2688       {
2689         need_retransmission = GNUNET_YES;
2690         break;
2691       }
2692     }
2693     if (GNUNET_YES == need_retransmission)
2694     {
2695       write_data (socket);
2696       return GNUNET_OK;
2697     }
2698
2699   call_write_cont_cb:
2700     /* Free the packets */
2701     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2702     {
2703       GNUNET_free_non_null (socket->write_handle->messages[packet]);
2704     }
2705     write_handle = socket->write_handle;
2706     socket->write_handle = NULL;
2707     if (NULL != write_handle->write_cont)
2708       write_handle->write_cont (write_handle->write_cont_cls,
2709                                 GNUNET_STREAM_OK,
2710                                 write_handle->size);
2711     /* We are done with the write handle - Freeing it */
2712     GNUNET_free (write_handle);
2713     LOG (GNUNET_ERROR_TYPE_DEBUG,
2714          "%s: Write completion callback completed\n",
2715          GNUNET_i2s (&socket->other_peer));      
2716     break;
2717   default:
2718     break;
2719   }
2720   return GNUNET_OK;
2721 }
2722
2723
2724 /**
2725  * Handler for DATA_ACK messages
2726  *
2727  * @param cls the 'struct GNUNET_STREAM_Socket'
2728  * @param tunnel connection to the other end
2729  * @param tunnel_ctx unused
2730  * @param sender who sent the message
2731  * @param message the actual message
2732  * @param atsi performance data for the connection
2733  * @return GNUNET_OK to keep the connection open,
2734  *         GNUNET_SYSERR to close it (signal serious error)
2735  */
2736 static int
2737 client_handle_ack (void *cls,
2738                    struct GNUNET_MESH_Tunnel *tunnel,
2739                    void **tunnel_ctx,
2740                    const struct GNUNET_PeerIdentity *sender,
2741                    const struct GNUNET_MessageHeader *message,
2742                    const struct GNUNET_ATS_Information*atsi)
2743 {
2744   struct GNUNET_STREAM_Socket *socket = cls;
2745   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2746  
2747   return handle_ack (socket, tunnel, sender, ack, atsi);
2748 }
2749
2750
2751 /**
2752  * Handler for DATA_ACK messages
2753  *
2754  * @param cls the server's listen socket
2755  * @param tunnel connection to the other end
2756  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2757  * @param sender who sent the message
2758  * @param message the actual message
2759  * @param atsi performance data for the connection
2760  * @return GNUNET_OK to keep the connection open,
2761  *         GNUNET_SYSERR to close it (signal serious error)
2762  */
2763 static int
2764 server_handle_ack (void *cls,
2765                    struct GNUNET_MESH_Tunnel *tunnel,
2766                    void **tunnel_ctx,
2767                    const struct GNUNET_PeerIdentity *sender,
2768                    const struct GNUNET_MessageHeader *message,
2769                    const struct GNUNET_ATS_Information*atsi)
2770 {
2771   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2772   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2773  
2774   return handle_ack (socket, tunnel, sender, ack, atsi);
2775 }
2776
2777
2778 /**
2779  * For client message handlers, the stream socket is in the
2780  * closure argument.
2781  */
2782 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2783   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2784   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2785    sizeof (struct GNUNET_STREAM_AckMessage) },
2786   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2787    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2788   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2789    sizeof (struct GNUNET_STREAM_MessageHeader)},
2790   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2791    sizeof (struct GNUNET_STREAM_MessageHeader)},
2792   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2793    sizeof (struct GNUNET_STREAM_MessageHeader)},
2794   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2795    sizeof (struct GNUNET_STREAM_MessageHeader)},
2796   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2797    sizeof (struct GNUNET_STREAM_MessageHeader)},
2798   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2799    sizeof (struct GNUNET_STREAM_MessageHeader)},
2800   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2801    sizeof (struct GNUNET_STREAM_MessageHeader)},
2802   {NULL, 0, 0}
2803 };
2804
2805
2806 /**
2807  * For server message handlers, the stream socket is in the
2808  * tunnel context, and the listen socket in the closure argument.
2809  */
2810 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2811   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2812   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2813    sizeof (struct GNUNET_STREAM_AckMessage) },
2814   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2815    sizeof (struct GNUNET_STREAM_MessageHeader)},
2816   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2817    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2818   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2819    sizeof (struct GNUNET_STREAM_MessageHeader)},
2820   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2821    sizeof (struct GNUNET_STREAM_MessageHeader)},
2822   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2823    sizeof (struct GNUNET_STREAM_MessageHeader)},
2824   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2825    sizeof (struct GNUNET_STREAM_MessageHeader)},
2826   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2827    sizeof (struct GNUNET_STREAM_MessageHeader)},
2828   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2829    sizeof (struct GNUNET_STREAM_MessageHeader)},
2830   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2831    sizeof (struct GNUNET_STREAM_MessageHeader)},
2832   {NULL, 0, 0}
2833 };
2834
2835
2836 /**
2837  * Function called when our target peer is connected to our tunnel
2838  *
2839  * @param cls the socket for which this tunnel is created
2840  * @param peer the peer identity of the target
2841  * @param atsi performance data for the connection
2842  */
2843 static void
2844 mesh_peer_connect_callback (void *cls,
2845                             const struct GNUNET_PeerIdentity *peer,
2846                             const struct GNUNET_ATS_Information * atsi)
2847 {
2848   struct GNUNET_STREAM_Socket *socket = cls;
2849   struct GNUNET_STREAM_MessageHeader *message;
2850   
2851   if (0 != memcmp (peer,
2852                    &socket->other_peer,
2853                    sizeof (struct GNUNET_PeerIdentity)))
2854   {
2855     LOG (GNUNET_ERROR_TYPE_DEBUG,
2856          "%s: A peer which is not our target has connected to our tunnel\n",
2857          GNUNET_i2s(peer));
2858     return;
2859   }
2860   LOG (GNUNET_ERROR_TYPE_DEBUG,
2861        "%s: Target peer %s connected\n",
2862        GNUNET_i2s (&socket->other_peer),
2863        GNUNET_i2s (&socket->other_peer));
2864   /* Set state to INIT */
2865   socket->state = STATE_INIT;
2866   /* Send HELLO message */
2867   message = generate_hello ();
2868   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2869   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2870                  socket->control_retransmission_task_id);
2871   socket->control_retransmission_task_id =
2872     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2873                                   &control_retransmission_task, socket);
2874 }
2875
2876
2877 /**
2878  * Function called when our target peer is disconnected from our tunnel
2879  *
2880  * @param cls the socket associated which this tunnel
2881  * @param peer the peer identity of the target
2882  */
2883 static void
2884 mesh_peer_disconnect_callback (void *cls,
2885                                const struct GNUNET_PeerIdentity *peer)
2886 {
2887   struct GNUNET_STREAM_Socket *socket=cls;
2888   
2889   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2890   LOG (GNUNET_ERROR_TYPE_DEBUG,
2891        "%s: Other peer %s disconnected \n",
2892        GNUNET_i2s (&socket->other_peer),
2893        GNUNET_i2s (&socket->other_peer));
2894 }
2895
2896
2897 /**
2898  * Method called whenever a peer creates a tunnel to us
2899  *
2900  * @param cls closure
2901  * @param tunnel new handle to the tunnel
2902  * @param initiator peer that started the tunnel
2903  * @param atsi performance information for the tunnel
2904  * @return initial tunnel context for the tunnel
2905  *         (can be NULL -- that's not an error)
2906  */
2907 static void *
2908 new_tunnel_notify (void *cls,
2909                    struct GNUNET_MESH_Tunnel *tunnel,
2910                    const struct GNUNET_PeerIdentity *initiator,
2911                    const struct GNUNET_ATS_Information *atsi)
2912 {
2913   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2914   struct GNUNET_STREAM_Socket *socket;
2915
2916   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2917      from the same peer again until the socket is closed */
2918
2919   if (GNUNET_NO == lsocket->listening)
2920   {
2921     GNUNET_MESH_tunnel_destroy (tunnel);
2922     return NULL;
2923   }
2924   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2925   socket->other_peer = *initiator;
2926   socket->tunnel = tunnel;
2927   socket->state = STATE_INIT;
2928   socket->lsocket = lsocket;
2929   socket->stat_handle = lsocket->stat_handle;
2930   socket->retransmit_timeout = lsocket->retransmit_timeout;
2931   socket->testing_active = lsocket->testing_active;
2932   socket->testing_set_write_sequence_number_value =
2933       lsocket->testing_set_write_sequence_number_value;
2934   socket->max_payload_size = lsocket->max_payload_size;
2935   LOG (GNUNET_ERROR_TYPE_DEBUG,
2936        "%s: Peer %s initiated tunnel to us\n", 
2937        GNUNET_i2s (&socket->other_peer),
2938        GNUNET_i2s (&socket->other_peer));
2939   if (NULL != socket->stat_handle)
2940   {
2941     GNUNET_STATISTICS_update (socket->stat_handle,
2942                               "total inbound connections received",
2943                               1, GNUNET_NO);
2944     GNUNET_STATISTICS_update (socket->stat_handle,
2945                               "inbound connections", 1, GNUNET_NO);
2946   }
2947   
2948   return socket;
2949 }
2950
2951
2952 /**
2953  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2954  * any associated state.  This function is NOT called if the client has
2955  * explicitly asked for the tunnel to be destroyed using
2956  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2957  * the tunnel.
2958  *
2959  * @param cls closure (set from GNUNET_MESH_connect)
2960  * @param tunnel connection to the other end (henceforth invalid)
2961  * @param tunnel_ctx place where local state associated
2962  *                   with the tunnel is stored
2963  */
2964 static void 
2965 tunnel_cleaner (void *cls,
2966                 const struct GNUNET_MESH_Tunnel *tunnel,
2967                 void *tunnel_ctx)
2968 {
2969   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2970   struct MessageQueue *head;
2971
2972   GNUNET_assert (tunnel == socket->tunnel);
2973   GNUNET_break_op(0);
2974   LOG (GNUNET_ERROR_TYPE_DEBUG,
2975        "%s: Peer %s has terminated connection abruptly\n",
2976        GNUNET_i2s (&socket->other_peer),
2977        GNUNET_i2s (&socket->other_peer));
2978   if (NULL != socket->stat_handle)
2979   {
2980     GNUNET_STATISTICS_update (socket->stat_handle,
2981                               "connections terminated abruptly", 1, GNUNET_NO);
2982     GNUNET_STATISTICS_update (socket->stat_handle,
2983                               "inbound connections", -1, GNUNET_NO);
2984   }
2985   /* Clear Transmit handles */
2986   if (NULL != socket->transmit_handle)
2987   {
2988     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2989     socket->transmit_handle = NULL;
2990   }
2991   /* Stop Tasks using socket->tunnel */
2992   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2993   {
2994     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2995     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2996   }
2997   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2998   {
2999     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3000     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3001   }  
3002   /* Terminate the control retransmission tasks */
3003   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3004   {
3005     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3006     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3007   }
3008   /* Clear existing message queue */
3009   while (NULL != (head = socket->queue_head)) {
3010     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3011                                  socket->queue_tail,
3012                                  head);
3013     GNUNET_free (head->message);
3014     GNUNET_free (head);
3015   }
3016   socket->tunnel = NULL;
3017 }
3018
3019
3020 /**
3021  * Callback to signal timeout on lockmanager lock acquire
3022  *
3023  * @param cls the ListenSocket
3024  * @param tc the scheduler task context
3025  */
3026 static void
3027 lockmanager_acquire_timeout (void *cls, 
3028                              const struct GNUNET_SCHEDULER_TaskContext *tc)
3029 {
3030   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
3031   GNUNET_STREAM_ListenCallback listen_cb;
3032   void *listen_cb_cls;
3033
3034   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
3035   listen_cb = lsocket->listen_cb;
3036   listen_cb_cls = lsocket->listen_cb_cls;
3037   if (NULL != listen_cb)
3038     listen_cb (listen_cb_cls, NULL, NULL);
3039 }
3040
3041
3042 /**
3043  * Callback to notify us on the status changes on app_port lock
3044  *
3045  * @param cls the ListenSocket
3046  * @param domain the domain name of the lock
3047  * @param lock the app_port
3048  * @param status the current status of the lock
3049  */
3050 static void
3051 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
3052                        enum GNUNET_LOCKMANAGER_Status status)
3053 {
3054   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
3055
3056   GNUNET_assert (lock == (uint32_t) lsocket->port);
3057   if (GNUNET_LOCKMANAGER_SUCCESS == status)
3058   {
3059     lsocket->listening = GNUNET_YES;
3060     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3061     {
3062       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3063       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
3064     }
3065     if (NULL == lsocket->mesh)
3066     {
3067       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
3068
3069       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
3070                                            lsocket, /* Closure */
3071                                            &new_tunnel_notify,
3072                                            &tunnel_cleaner,
3073                                            server_message_handlers,
3074                                            ports);
3075       GNUNET_assert (NULL != lsocket->mesh);
3076       if (NULL != lsocket->listen_ok_cb)
3077       {
3078         (void) lsocket->listen_ok_cb ();
3079       }
3080     }
3081   }
3082   if (GNUNET_LOCKMANAGER_RELEASE == status)
3083     lsocket->listening = GNUNET_NO;
3084 }
3085
3086
3087 /*****************/
3088 /* API functions */
3089 /*****************/
3090
3091
3092 /**
3093  * Tries to open a stream to the target peer
3094  *
3095  * @param cfg configuration to use
3096  * @param target the target peer to which the stream has to be opened
3097  * @param app_port the application port number which uniquely identifies this
3098  *            stream
3099  * @param open_cb this function will be called after stream has be established;
3100  *          cannot be NULL
3101  * @param open_cb_cls the closure for open_cb
3102  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3103  * @return if successful it returns the stream socket; NULL if stream cannot be
3104  *         opened 
3105  */
3106 struct GNUNET_STREAM_Socket *
3107 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3108                     const struct GNUNET_PeerIdentity *target,
3109                     GNUNET_MESH_ApplicationType app_port,
3110                     GNUNET_STREAM_OpenCallback open_cb,
3111                     void *open_cb_cls,
3112                     ...)
3113 {
3114   struct GNUNET_STREAM_Socket *socket;
3115   enum GNUNET_STREAM_Option option;
3116   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3117   va_list vargs;
3118   uint16_t payload_size;
3119
3120   LOG (GNUNET_ERROR_TYPE_DEBUG,
3121        "%s\n", __func__);
3122   GNUNET_assert (NULL != open_cb);
3123   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3124   socket->other_peer = *target;
3125   socket->open_cb = open_cb;
3126   socket->open_cls = open_cb_cls;
3127   /* Set defaults */
3128   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3129   socket->testing_active = GNUNET_NO;
3130   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3131   va_start (vargs, open_cb_cls); /* Parse variable args */
3132   do {
3133     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3134     switch (option)
3135     {
3136     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3137       /* Expect struct GNUNET_TIME_Relative */
3138       socket->retransmit_timeout = va_arg (vargs,
3139                                            struct GNUNET_TIME_Relative);
3140       break;
3141     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3142       socket->testing_active = GNUNET_YES;
3143       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3144                                                                 uint32_t);
3145       break;
3146     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3147       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3148       break;
3149     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3150       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3151       break;
3152     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3153       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3154       GNUNET_assert (0 != payload_size);
3155       if (payload_size < socket->max_payload_size)
3156         socket->max_payload_size = payload_size;
3157       break;
3158     case GNUNET_STREAM_OPTION_END:
3159       break;
3160     }
3161   } while (GNUNET_STREAM_OPTION_END != option);
3162   va_end (vargs);               /* End of variable args parsing */
3163   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3164                                       socket, /* cls */
3165                                       NULL, /* No inbound tunnel handler */
3166                                       NULL, /* No in-tunnel cleaner */
3167                                       client_message_handlers,
3168                                       ports); /* We don't get inbound tunnels */
3169   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3170   {
3171     GNUNET_free (socket);
3172     return NULL;
3173   }
3174   /* Now create the mesh tunnel to target */
3175   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3176   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3177                                               NULL, /* Tunnel context */
3178                                               &mesh_peer_connect_callback,
3179                                               &mesh_peer_disconnect_callback,
3180                                               socket);
3181   GNUNET_assert (NULL != socket->tunnel);
3182   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3183                                         &socket->other_peer);
3184   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3185   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3186   return socket;
3187 }
3188
3189
3190 /**
3191  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3192  *
3193  * @param socket the stream socket
3194  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3195  * @param completion_cb the callback that will be called upon successful
3196  *          shutdown of given operation
3197  * @param completion_cls the closure for the completion callback
3198  * @return the shutdown handle
3199  */
3200 struct GNUNET_STREAM_ShutdownHandle *
3201 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3202                         int operation,
3203                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3204                         void *completion_cls)
3205 {
3206   struct GNUNET_STREAM_ShutdownHandle *handle;
3207   struct GNUNET_STREAM_MessageHeader *msg;
3208   
3209   GNUNET_assert (NULL == socket->shutdown_handle);
3210   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3211   handle->socket = socket;
3212   handle->completion_cb = completion_cb;
3213   handle->completion_cls = completion_cls;
3214   socket->shutdown_handle = handle;
3215   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3216        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3217        || ((GNUNET_YES == socket->transmit_closed) 
3218            && (GNUNET_YES == socket->receive_closed)
3219            && (SHUT_RDWR == operation)) )
3220   {
3221     handle->operation = operation;
3222     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3223                                                           socket);
3224     return handle;
3225   }
3226   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3227   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3228   switch (operation)
3229   {
3230   case SHUT_RD:
3231     handle->operation = SHUT_RD;
3232     if (NULL != socket->read_handle)
3233       LOG (GNUNET_ERROR_TYPE_WARNING,
3234            "Existing read handle should be cancelled before shutting"
3235            " down reading\n");
3236     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3237     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3238                    GNUNET_NO);
3239     socket->receive_closed = GNUNET_YES;
3240     break;
3241   case SHUT_WR:
3242     handle->operation = SHUT_WR;
3243     if (NULL != socket->write_handle)
3244       LOG (GNUNET_ERROR_TYPE_WARNING,
3245            "Existing write handle should be cancelled before shutting"
3246            " down writing\n");
3247     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3248     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3249                    GNUNET_NO);
3250     socket->transmit_closed = GNUNET_YES;
3251     break;
3252   case SHUT_RDWR:
3253     handle->operation = SHUT_RDWR;
3254     if (NULL != socket->write_handle)
3255       LOG (GNUNET_ERROR_TYPE_WARNING,
3256            "Existing write handle should be cancelled before shutting"
3257            " down writing\n");
3258     if (NULL != socket->read_handle)
3259       LOG (GNUNET_ERROR_TYPE_WARNING,
3260            "Existing read handle should be cancelled before shutting"
3261            " down reading\n");
3262     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3263     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3264     socket->transmit_closed = GNUNET_YES;
3265     socket->receive_closed = GNUNET_YES;
3266     break;
3267   default:
3268     LOG (GNUNET_ERROR_TYPE_WARNING,
3269          "GNUNET_STREAM_shutdown called with invalid value for "
3270          "parameter operation -- Ignoring\n");
3271     GNUNET_free (msg);
3272     GNUNET_free (handle);
3273     return NULL;
3274   }
3275   handle->close_msg_retransmission_task_id =
3276     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3277                                   &close_msg_retransmission_task,
3278                                   handle);
3279   return handle;
3280 }
3281
3282
3283 /**
3284  * Cancels a pending shutdown. Note that the shutdown messages may already be
3285  * sent and the stream is shutdown already for the operation given to
3286  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3287  * shutdown messages and frees the shutdown handle.
3288  *
3289  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3290  */
3291 void
3292 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3293 {
3294   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3295     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3296   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3297     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3298   handle->socket->shutdown_handle = NULL;
3299   GNUNET_free (handle);
3300 }
3301
3302
3303 /**
3304  * Closes the stream
3305  *
3306  * @param socket the stream socket
3307  */
3308 void
3309 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3310 {
3311   struct MessageQueue *head;
3312
3313   if (NULL != socket->read_handle)
3314   {
3315     LOG (GNUNET_ERROR_TYPE_WARNING,
3316          "Closing STREAM socket when a read handle is pending\n");
3317     GNUNET_STREAM_read_cancel (socket->read_handle);
3318   }
3319   if (NULL != socket->write_handle)
3320   {
3321     LOG (GNUNET_ERROR_TYPE_WARNING,
3322          "Closing STREAM socket when a write handle is pending\n");
3323     GNUNET_STREAM_write_cancel (socket->write_handle);
3324     //socket->write_handle = NULL;
3325   }
3326   /* Terminate the ack'ing task if they are still present */
3327   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3328   {
3329     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3330     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3331   }
3332   /* Terminate the control retransmission tasks */
3333   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3334   {
3335     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3336   }
3337   /* Clear Transmit handles */
3338   if (NULL != socket->transmit_handle)
3339   {
3340     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3341     socket->transmit_handle = NULL;
3342   }
3343   /* Clear existing message queue */
3344   while (NULL != (head = socket->queue_head)) {
3345     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3346                                  socket->queue_tail,
3347                                  head);
3348     GNUNET_free (head->message);
3349     GNUNET_free (head);
3350   }
3351   /* Close associated tunnel */
3352   if (NULL != socket->tunnel)
3353   {
3354     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3355     socket->tunnel = NULL;
3356   }
3357   /* Close mesh connection */
3358   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3359   {
3360     GNUNET_MESH_disconnect (socket->mesh);
3361     socket->mesh = NULL;
3362   }
3363   /* Close statistics connection */
3364   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3365     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3366   /* Release receive buffer */
3367   if (NULL != socket->receive_buffer)
3368   {
3369     GNUNET_free (socket->receive_buffer);
3370   }
3371   GNUNET_free (socket);
3372 }
3373
3374
3375 /**
3376  * Listens for stream connections for a specific application ports
3377  *
3378  * @param cfg the configuration to use
3379  * @param app_port the application port for which new streams will be accepted
3380  * @param listen_cb this function will be called when a peer tries to establish
3381  *            a stream with us
3382  * @param listen_cb_cls closure for listen_cb
3383  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3384  * @return listen socket, NULL for any error
3385  */
3386 struct GNUNET_STREAM_ListenSocket *
3387 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3388                       GNUNET_MESH_ApplicationType app_port,
3389                       GNUNET_STREAM_ListenCallback listen_cb,
3390                       void *listen_cb_cls,
3391                       ...)
3392 {
3393   struct GNUNET_STREAM_ListenSocket *lsocket;
3394   struct GNUNET_TIME_Relative listen_timeout;
3395   enum GNUNET_STREAM_Option option;
3396   va_list vargs;
3397   uint16_t payload_size;
3398
3399   GNUNET_assert (NULL != listen_cb);
3400   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3401   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3402   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3403   if (NULL == lsocket->lockmanager)
3404   {
3405     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3406     GNUNET_free (lsocket);
3407     return NULL;
3408   }
3409   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3410   /* Set defaults */
3411   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3412   lsocket->testing_active = GNUNET_NO;
3413   lsocket->listen_ok_cb = NULL;
3414   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3415   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3416   va_start (vargs, listen_cb_cls);
3417   do {
3418     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3419     switch (option)
3420     {
3421     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3422       lsocket->retransmit_timeout = va_arg (vargs,
3423                                             struct GNUNET_TIME_Relative);
3424       break;
3425     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3426       lsocket->testing_active = GNUNET_YES;
3427       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3428                                                                  uint32_t);
3429       break;
3430     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3431       listen_timeout = GNUNET_TIME_relative_multiply
3432         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3433       break;
3434     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3435       lsocket->listen_ok_cb = va_arg (vargs,
3436                                       GNUNET_STREAM_ListenSuccessCallback);
3437       break;
3438     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3439       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3440       GNUNET_assert (0 != payload_size);
3441       if (payload_size < lsocket->max_payload_size)
3442         lsocket->max_payload_size = payload_size;
3443       break;
3444     case GNUNET_STREAM_OPTION_END:
3445       break;
3446     }
3447   } while (GNUNET_STREAM_OPTION_END != option);
3448   va_end (vargs);
3449   lsocket->port = app_port;
3450   lsocket->listen_cb = listen_cb;
3451   lsocket->listen_cb_cls = listen_cb_cls;
3452   lsocket->locking_request = 
3453     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3454                                      (uint32_t) lsocket->port,
3455                                      &lock_status_change_cb, lsocket);
3456   lsocket->lockmanager_acquire_timeout_task =
3457     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3458                                   &lockmanager_acquire_timeout, lsocket);
3459   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3460                                                    lsocket->cfg);
3461   return lsocket;
3462 }
3463
3464
3465 /**
3466  * Closes the listen socket
3467  *
3468  * @param lsocket the listen socket
3469  */
3470 void
3471 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3472 {
3473   /* Close MESH connection */
3474   if (NULL != lsocket->mesh)
3475     GNUNET_MESH_disconnect (lsocket->mesh);
3476   if (NULL != lsocket->stat_handle)
3477     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3478   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3479   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3480     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3481   if (NULL != lsocket->locking_request)
3482     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3483   if (NULL != lsocket->lockmanager)
3484     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3485   GNUNET_free (lsocket);
3486 }
3487
3488
3489 /**
3490  * Tries to write the given data to the stream. The maximum size of data that
3491  * can be written per a write operation is ~ 4MB (64 * (64000 - sizeof (struct
3492  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3493  * violation, however only the said number of maximum bytes will be written.
3494  *
3495  * @param socket the socket representing a stream
3496  * @param data the data buffer from where the data is written into the stream
3497  * @param size the number of bytes to be written from the data buffer
3498  * @param timeout the timeout period
3499  * @param write_cont the function to call upon writing some bytes into the
3500  *          stream 
3501  * @param write_cont_cls the closure
3502  *
3503  * @return handle to cancel the operation; if a previous write is pending NULL
3504  *           is returned. If the stream has been shutdown for this operation or
3505  *           is broken then write_cont is immediately called and NULL is
3506  *           returned.
3507  */
3508 struct GNUNET_STREAM_WriteHandle *
3509 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3510                      const void *data,
3511                      size_t size,
3512                      struct GNUNET_TIME_Relative timeout,
3513                      GNUNET_STREAM_CompletionContinuation write_cont,
3514                      void *write_cont_cls)
3515 {
3516   struct GNUNET_STREAM_WriteHandle *io_handle;
3517   struct GNUNET_STREAM_DataMessage *data_msg;
3518   const void *sweep;
3519   struct GNUNET_TIME_Relative ack_deadline;
3520   unsigned int num_needed_packets;
3521   unsigned int packet;
3522   uint32_t packet_size;
3523   uint32_t payload_size;
3524   uint16_t max_data_packet_size;
3525
3526   LOG (GNUNET_ERROR_TYPE_DEBUG,
3527        "%s\n", __func__);
3528   if (NULL != socket->write_handle)
3529   {
3530     GNUNET_break (0);
3531     return NULL;
3532   }
3533   if (NULL == socket->tunnel)
3534   {
3535     if (NULL != write_cont)
3536       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3537     return NULL;
3538   }
3539   switch (socket->state)
3540   {
3541   case STATE_TRANSMIT_CLOSED:
3542   case STATE_TRANSMIT_CLOSE_WAIT:
3543   case STATE_CLOSED:
3544   case STATE_CLOSE_WAIT:
3545     if (NULL != write_cont)
3546       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3547     LOG (GNUNET_ERROR_TYPE_DEBUG,
3548          "%s() END\n", __func__);
3549     return NULL;
3550   case STATE_INIT:
3551   case STATE_LISTEN:
3552   case STATE_HELLO_WAIT:
3553     if (NULL != write_cont)
3554       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3555     LOG (GNUNET_ERROR_TYPE_DEBUG,
3556          "%s() END\n", __func__);
3557     return NULL;
3558   case STATE_ESTABLISHED:
3559   case STATE_RECEIVE_CLOSED:
3560   case STATE_RECEIVE_CLOSE_WAIT:
3561     break;
3562   }
3563   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3564     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3565   num_needed_packets =
3566       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3567   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_WriteHandle));
3568   io_handle->socket = socket;
3569   io_handle->write_cont = write_cont;
3570   io_handle->write_cont_cls = write_cont_cls;
3571   io_handle->size = size;
3572   io_handle->packets_sent = 0;
3573   sweep = data;
3574   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3575      determined from RTT */
3576   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3577   /* Divide the given buffer into packets for sending */
3578   max_data_packet_size =
3579       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3580   io_handle->max_ack_base_num = socket->write_sequence_number;
3581   for (packet=0; packet < num_needed_packets; packet++)
3582   {
3583     if ((packet + 1) * socket->max_payload_size < size) 
3584     {
3585       payload_size = socket->max_payload_size;
3586       packet_size = max_data_packet_size;
3587     }
3588     else 
3589     {
3590       payload_size = size - packet * socket->max_payload_size;
3591       packet_size = 
3592           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3593     }
3594     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3595     io_handle->messages[packet]->header.header.size = htons (packet_size);
3596     io_handle->messages[packet]->header.header.type =
3597       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3598     io_handle->messages[packet]->sequence_number =
3599       htonl (socket->write_sequence_number++);
3600     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3601     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3602        determined from RTT */
3603     io_handle->messages[packet]->ack_deadline =
3604       GNUNET_TIME_relative_hton (ack_deadline);
3605     data_msg = io_handle->messages[packet];
3606     /* Copy data from given buffer to the packet */
3607     memcpy (&data_msg[1], sweep, payload_size);
3608     sweep += payload_size;
3609     socket->write_offset += payload_size;
3610   }
3611   /* ack the last data message. FIXME: remove when we figure out how to do this
3612      using RTT */
3613   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3614       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3615   socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
3616   socket->write_handle = io_handle;
3617   write_data (socket);
3618   LOG (GNUNET_ERROR_TYPE_DEBUG,
3619        "%s() END\n", __func__);
3620   return io_handle;
3621 }
3622
3623
3624 /**
3625  * Function to check the ACK bitmap for any received messages and call the data processor
3626  *
3627  * @param cls the socket
3628  * @param tc the scheduler task context
3629  */
3630 static void
3631 probe_data_availability (void *cls,
3632                          const struct GNUNET_SCHEDULER_TaskContext *tc)
3633 {
3634   struct GNUNET_STREAM_Socket *socket = cls;
3635
3636   GNUNET_assert (NULL != socket->read_handle);
3637   socket->read_handle->probe_data_availability_task_id =
3638       GNUNET_SCHEDULER_NO_TASK;
3639   if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id)
3640     return;     /* A task to call read processor is present */
3641   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3642                                           0))
3643     socket->read_handle->read_task_id 
3644         = GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
3645 }
3646
3647
3648
3649 /**
3650  * Tries to read data from the stream. Should not be called when another read
3651  * handle is present; the existing read handle should be canceled with
3652  * GNUNET_STREAM_read_cancel(). Only one read handle per socket is present at
3653  * any time
3654  *
3655  * @param socket the socket representing a stream
3656  * @param timeout the timeout period
3657  * @param proc function to call with data (once only)
3658  * @param proc_cls the closure for proc
3659  * @return handle to cancel the operation; NULL is returned if the stream has
3660  *           been shutdown for this type of opeartion (the DataProcessor is
3661  *           immediately called with GNUNET_STREAM_SHUTDOWN as status)
3662  */
3663 struct GNUNET_STREAM_ReadHandle *
3664 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3665                     struct GNUNET_TIME_Relative timeout,
3666                     GNUNET_STREAM_DataProcessor proc,
3667                     void *proc_cls)
3668 {
3669   struct GNUNET_STREAM_ReadHandle *read_handle;
3670   
3671   LOG (GNUNET_ERROR_TYPE_DEBUG,
3672        "%s: %s()\n", 
3673        GNUNET_i2s (&socket->other_peer),
3674        __func__);
3675   /* Only one read handle is permitted at any time; cancel the existing or wait
3676      for it to complete */
3677   GNUNET_assert (NULL == socket->read_handle);
3678   GNUNET_assert (NULL != proc);
3679   if (GNUNET_YES == socket->receive_closed)
3680     return NULL;
3681   switch (socket->state)
3682   {
3683   case STATE_RECEIVE_CLOSED:
3684   case STATE_RECEIVE_CLOSE_WAIT:
3685   case STATE_CLOSED:
3686   case STATE_CLOSE_WAIT:
3687     LOG (GNUNET_ERROR_TYPE_DEBUG,
3688          "%s: %s() END\n",
3689          GNUNET_i2s (&socket->other_peer),
3690          __func__);
3691     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3692     return NULL;
3693   default:
3694     break;
3695   }
3696   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ReadHandle));
3697   read_handle->proc = proc;
3698   read_handle->proc_cls = proc_cls;
3699   read_handle->socket = socket;
3700   socket->read_handle = read_handle;
3701   read_handle->probe_data_availability_task_id =
3702       GNUNET_SCHEDULER_add_now (&probe_data_availability, socket);
3703   read_handle->read_io_timeout_task_id =
3704       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3705   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3706        GNUNET_i2s (&socket->other_peer), __func__);
3707   return read_handle;
3708 }
3709
3710
3711 /**
3712  * Cancels pending write operation. Also cancels packet retransmissions which
3713  * may have resulted otherwise.
3714  *
3715  * CAUTION: Normally a write operation is considered successful if the data
3716  * given to it is sent and acknowledged by the receiver. As data is divided
3717  * into packets, it is possible that not all packets are received by the
3718  * receiver. Any missing packets are then retransmitted till the receiver
3719  * acknowledges all packets or until a timeout . During this scenario if the
3720  * write operation is cancelled all such retransmissions are also
3721  * cancelled. This may leave the receiver's receive buffer incompletely filled
3722  * as some missing packets are never retransmitted. So this operation should be
3723  * used before shutting down transmission from our side or before closing the
3724  * socket.
3725  *
3726  * @param wh write operation handle to cancel
3727  */
3728 void
3729 GNUNET_STREAM_write_cancel (struct GNUNET_STREAM_WriteHandle *wh)
3730 {
3731   struct GNUNET_STREAM_Socket *socket = wh->socket;
3732   unsigned int packet;
3733
3734   GNUNET_assert (NULL != socket->write_handle);
3735   GNUNET_assert (socket->write_handle == wh);
3736   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3737   {
3738     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3739     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3740   }
3741   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3742   {
3743     if (NULL == wh->messages[packet]) break;
3744     GNUNET_free (wh->messages[packet]);
3745   }      
3746   GNUNET_free (socket->write_handle);
3747   socket->write_handle = NULL;
3748 }
3749
3750
3751 /**
3752  * Cancel pending read operation.
3753  *
3754  * @param rh read operation handle to cancel
3755  */
3756 void
3757 GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
3758 {
3759   struct GNUNET_STREAM_Socket *socket;
3760   
3761   socket = rh->socket;
3762   GNUNET_assert (NULL != socket->read_handle);
3763   GNUNET_assert (rh == socket->read_handle);
3764   cleanup_read_handle (socket);
3765 }
3766
3767 /* end of stream_api.c */