- make api compile
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_util_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_WriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_ReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * Mesh transmit timeout
285    */
286   struct GNUNET_TIME_Relative mesh_retry_timeout;
287
288   /**
289    * Data retransmission timeout
290    */
291   struct GNUNET_TIME_Relative data_retransmit_timeout;
292
293   /**
294    * The state of the protocol associated with this socket
295    */
296   enum State state;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * Is receive closed
305    */
306   int receive_closed;
307
308   /**
309    * Is transmission closed
310    */
311   int transmit_closed;
312
313   /**
314    * The application port number (type: uint32_t)
315    */
316   GNUNET_MESH_ApplicationType port;
317
318   /**
319    * The write sequence number to be set incase of testing
320    */
321   uint32_t testing_set_write_sequence_number_value;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363
364   /**
365    * The maximum size of the data message payload this stream handle can send
366    */
367   uint16_t max_payload_size;
368 };
369
370
371 /**
372  * A socket for listening
373  */
374 struct GNUNET_STREAM_ListenSocket
375 {
376   /**
377    * The mesh handle
378    */
379   struct GNUNET_MESH_Handle *mesh;
380
381   /**
382    * Handle to statistics
383    */
384   struct GNUNET_STATISTICS_Handle *stat_handle;
385
386   /**
387    * Our configuration
388    */
389   struct GNUNET_CONFIGURATION_Handle *cfg;
390
391   /**
392    * Handle to the lock manager service
393    */
394   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
395
396   /**
397    * The active LockingRequest from lockmanager
398    */
399   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
400
401   /**
402    * Callback to call after acquring a lock and listening
403    */
404   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
405
406   /**
407    * The callback function which is called after successful opening socket
408    */
409   GNUNET_STREAM_ListenCallback listen_cb;
410
411   /**
412    * The call back closure
413    */
414   void *listen_cb_cls;
415
416   /**
417    * The service port
418    */
419   GNUNET_MESH_ApplicationType port;
420   
421   /**
422    * The id of the lockmanager timeout task
423    */
424   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
425
426   /**
427    * The retransmit timeout
428    */
429   struct GNUNET_TIME_Relative retransmit_timeout;
430   
431   /**
432    * Listen enabled?
433    */
434   int listening;
435
436   /**
437    * Whether testing mode is active or not
438    */
439   int testing_active;
440
441   /**
442    * The write sequence number to be set incase of testing
443    */
444   uint32_t testing_set_write_sequence_number_value;
445
446   /**
447    * The maximum size of the data message payload this stream handle can send
448    */
449   uint16_t max_payload_size;
450
451 };
452
453
454 /**
455  * The IO Write Handle
456  */
457 struct GNUNET_STREAM_WriteHandle
458 {
459   /**
460    * The socket to which this write handle is associated
461    */
462   struct GNUNET_STREAM_Socket *socket;
463
464   /**
465    * The write continuation callback
466    */
467   GNUNET_STREAM_CompletionContinuation write_cont;
468
469   /**
470    * Write continuation closure
471    */
472   void *write_cont_cls;
473
474   /**
475    * The packet_buffers associated with this Handle
476    */
477   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
478
479   /**
480    * The bitmap of this IOHandle; Corresponding bit for a message is set when
481    * it has been acknowledged by the receiver
482    */
483   GNUNET_STREAM_AckBitmap ack_bitmap;
484
485   /**
486    * Number of bytes in this write handle
487    */
488   size_t size;
489
490   /**
491    * Number of packets already transmitted from this IO handle. Retransmitted
492    * packets are not taken into account here. This is used to determine which
493    * packets account for retransmission and which packets occupy buffer space at
494    * the receiver.
495    */
496   unsigned int packets_sent;
497
498   /**
499    * The maximum of the base numbers of the received acks
500    */
501   uint32_t max_ack_base_num;
502
503 };
504
505
506 /**
507  * The IO Read Handle
508  */
509 struct GNUNET_STREAM_ReadHandle
510 {
511   /**
512    * The socket to which this read handle is associated
513    */
514   struct GNUNET_STREAM_Socket *socket;
515   
516   /**
517    * Callback for the read processor
518    */
519   GNUNET_STREAM_DataProcessor proc;
520
521   /**
522    * The closure pointer for the read processor callback
523    */
524   void *proc_cls;
525
526   /**
527    * Task identifier for the read io timeout task
528    */
529   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
530
531   /**
532    * Task scheduled to continue a read operation.
533    */
534   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
535
536   /**
537    * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call
538    * the read processor task
539    */
540   GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id;
541 };
542
543
544 /**
545  * Handle for Shutdown
546  */
547 struct GNUNET_STREAM_ShutdownHandle
548 {
549   /**
550    * The socket associated with this shutdown handle
551    */
552   struct GNUNET_STREAM_Socket *socket;
553
554   /**
555    * Shutdown completion callback
556    */
557   GNUNET_STREAM_ShutdownCompletion completion_cb;
558
559   /**
560    * Closure for completion callback
561    */
562   void *completion_cls;
563
564   /**
565    * Close message retransmission task id
566    */
567   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
568
569   /**
570    * Task scheduled to call the shutdown continuation callback
571    */
572   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
573
574   /**
575    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
576    */
577   int operation;  
578 };
579
580
581 /**
582  * Default value in seconds for various timeouts
583  */
584 static const unsigned int default_timeout = 10;
585
586 /**
587  * The domain name for locks we use here
588  */
589 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
590
591 /**
592  * Callback function for sending queued message
593  *
594  * @param cls closure the socket
595  * @param size number of bytes available in buf
596  * @param buf where the callee should write the message
597  * @return number of bytes written to buf
598  */
599 static size_t
600 send_message_notify (void *cls, size_t size, void *buf)
601 {
602   struct GNUNET_STREAM_Socket *socket = cls;
603   struct MessageQueue *head;
604   size_t ret;
605
606   socket->transmit_handle = NULL; /* Remove the transmit handle */
607   head = socket->queue_head;
608   if (NULL == head)
609     return 0; /* just to be safe */
610   if (0 == size)                /* request timed out */
611   {
612     socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
613         (socket->mesh_retry_timeout);    
614     LOG (GNUNET_ERROR_TYPE_DEBUG,
615          "%s: Message sending to MESH timed out. Retrying in %s \n",
616          GNUNET_i2s (&socket->other_peer),
617          GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
618                                                  GNUNET_YES));
619     socket->transmit_handle = 
620         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
621                                            GNUNET_NO, /* Corking */
622                                            socket->mesh_retry_timeout,
623                                            &socket->other_peer,
624                                            ntohs (head->message->size),
625                                            &send_message_notify,
626                                            socket);
627     return 0;
628   }
629   ret = ntohs (head->message->size);
630   GNUNET_assert (size >= ret);
631   memcpy (buf, head->message, ret);
632   if (NULL != head->finish_cb)
633   {
634     head->finish_cb (head->finish_cb_cls, socket);
635   }
636   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
637                                socket->queue_tail,
638                                head);
639   GNUNET_free (head->message);
640   GNUNET_free (head);
641   if (NULL != socket->transmit_handle)
642     return ret; /* 'finish_cb' might have triggered message already! */
643   head = socket->queue_head;
644   if (NULL != head)    /* more pending messages to send */
645   {
646     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
647     socket->transmit_handle = 
648         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
649                                            GNUNET_NO, /* Corking */
650                                            socket->mesh_retry_timeout,
651                                            &socket->other_peer,
652                                            ntohs (head->message->size),
653                                            &send_message_notify,
654                                            socket);
655   }
656   return ret;
657 }
658
659
660 /**
661  * Queues a message for sending using the mesh connection of a socket
662  *
663  * @param socket the socket whose mesh connection is used
664  * @param message the message to be sent
665  * @param finish_cb the callback to be called when the message is sent
666  * @param finish_cb_cls the closure for the callback
667  * @param urgent set to GNUNET_YES to add the message to the beginning of the
668  *          queue; GNUNET_NO to add at the tail
669  */
670 static void
671 queue_message (struct GNUNET_STREAM_Socket *socket,
672                struct GNUNET_MessageHeader *message,
673                SendFinishCallback finish_cb,
674                void *finish_cb_cls,
675                int urgent)
676 {
677   struct MessageQueue *queue_entity;
678
679   GNUNET_assert ((ntohs (message->type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
680                  && (ntohs (message->type)
681                      <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
682   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Queueing message of type %d and size %d\n",
683        GNUNET_i2s (&socket->other_peer),
684        ntohs (message->type),ntohs (message->size));
685   GNUNET_assert (NULL != message);
686   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
687   queue_entity->message = message;
688   queue_entity->finish_cb = finish_cb;
689   queue_entity->finish_cb_cls = finish_cb_cls;
690   if (GNUNET_YES == urgent)
691   {
692     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
693                                  queue_entity);
694     if (NULL != socket->transmit_handle)
695     {
696       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
697       socket->transmit_handle = NULL;
698     }
699   }
700   else
701     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
702                                       socket->queue_tail,
703                                       queue_entity);
704   if (NULL == socket->transmit_handle)
705   {
706     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
707     socket->transmit_handle = 
708         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
709                                            GNUNET_NO, /* Corking */
710                                            socket->mesh_retry_timeout,
711                                            &socket->other_peer,
712                                            ntohs (message->size),
713                                            &send_message_notify,
714                                            socket);
715   }
716 }
717
718
719 /**
720  * Copies a message and queues it for sending using the mesh connection of
721  * given socket 
722  *
723  * @param socket the socket whose mesh connection is used
724  * @param message the message to be sent
725  * @param finish_cb the callback to be called when the message is sent
726  * @param finish_cb_cls the closure for the callback
727  */
728 static void
729 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
730                         const struct GNUNET_MessageHeader *message,
731                         SendFinishCallback finish_cb,
732                         void *finish_cb_cls)
733 {
734   struct GNUNET_MessageHeader *msg_copy;
735   uint16_t size;
736   
737   size = ntohs (message->size);
738   msg_copy = GNUNET_malloc (size);
739   memcpy (msg_copy, message, size);
740   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
741 }
742
743
744 /**
745  * Writes data using the given socket. The amount of data written is limited by
746  * the receiver_window_size
747  *
748  * @param socket the socket to use
749  */
750 static void 
751 write_data (struct GNUNET_STREAM_Socket *socket);
752
753
754 /**
755  * Task for retransmitting data messages if they aren't ACK before their ack
756  * deadline 
757  *
758  * @param cls the socket
759  * @param tc the Task context
760  */
761 static void
762 data_retransmission_task (void *cls,
763                           const struct GNUNET_SCHEDULER_TaskContext *tc)
764 {
765   struct GNUNET_STREAM_Socket *socket = cls;
766   
767   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
768   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
769     return;
770   LOG (GNUNET_ERROR_TYPE_DEBUG,
771        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
772   write_data (socket);
773 }
774
775
776 /**
777  * Task for sending ACK message
778  *
779  * @param cls the socket
780  * @param tc the Task context
781  */
782 static void
783 ack_task (void *cls,
784           const struct GNUNET_SCHEDULER_TaskContext *tc)
785 {
786   struct GNUNET_STREAM_Socket *socket = cls;
787   struct GNUNET_STREAM_AckMessage *ack_msg;
788
789   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
790   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
791     return;
792   /* Create the ACK Message */
793   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
794   ack_msg->header.size = htons (sizeof (struct 
795                                                GNUNET_STREAM_AckMessage));
796   ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
797   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
798   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
799   ack_msg->receive_window_remaining = 
800     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
801   /* Queue up ACK for immediate sending */
802   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
803 }
804
805
806 /**
807  * Retransmission task for shutdown messages
808  *
809  * @param cls the shutdown handle
810  * @param tc the Task Context
811  */
812 static void
813 close_msg_retransmission_task (void *cls,
814                                const struct GNUNET_SCHEDULER_TaskContext *tc)
815 {
816   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
817   struct GNUNET_MessageHeader *msg;
818   struct GNUNET_STREAM_Socket *socket;
819
820   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
821   GNUNET_assert (NULL != shutdown_handle);
822   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
823     return;
824   socket = shutdown_handle->socket;
825   msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
826   msg->size = htons (sizeof (struct GNUNET_MessageHeader));
827   switch (shutdown_handle->operation)
828   {
829   case SHUT_RDWR:
830     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
831     break;
832   case SHUT_RD:
833     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
834     break;
835   case SHUT_WR:
836     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
837     break;
838   default:
839     GNUNET_free (msg);
840     shutdown_handle->close_msg_retransmission_task_id = 
841       GNUNET_SCHEDULER_NO_TASK;
842     return;
843   }
844   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
845   shutdown_handle->close_msg_retransmission_task_id =
846     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
847                                   &close_msg_retransmission_task,
848                                   shutdown_handle);
849 }
850
851
852 /**
853  * Function to modify a bit in GNUNET_STREAM_AckBitmap
854  *
855  * @param bitmap the bitmap to modify
856  * @param bit the bit number to modify
857  * @param value GNUNET_YES to on, GNUNET_NO to off
858  */
859 static void
860 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
861                       unsigned int bit, 
862                       int value)
863 {
864   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
865   if (GNUNET_YES == value)
866     *bitmap |= (1LL << bit);
867   else
868     *bitmap &= ~(1LL << bit);
869 }
870
871
872 /**
873  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
874  *
875  * @param bitmap address of the bitmap that has to be checked
876  * @param bit the bit number to check
877  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
878  */
879 static uint8_t
880 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
881                       unsigned int bit)
882 {
883   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
884   return 0 != (*bitmap & (1LL << bit));
885 }
886
887
888 /**
889  * Writes data using the given socket. The amount of data written is limited by
890  * the receiver_window_size
891  *
892  * @param socket the socket to use
893  */
894 static void 
895 write_data (struct GNUNET_STREAM_Socket *socket)
896 {
897   struct GNUNET_STREAM_WriteHandle *io_handle = socket->write_handle;
898   unsigned int packet;
899   
900   for (packet=0; packet < io_handle->packets_sent; packet++)
901   {
902     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
903                                            packet))
904     {
905       LOG (GNUNET_ERROR_TYPE_DEBUG,
906            "%s: Retransmitting DATA message with sequence %u\n",
907            GNUNET_i2s (&socket->other_peer),
908            ntohl (io_handle->messages[packet]->sequence_number));
909       copy_and_queue_message (socket,
910                               &io_handle->messages[packet]->header,
911                               NULL,
912                               NULL);
913     }
914   }
915   /* Now send new packets if there is enough buffer space */
916   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
917          (NULL != io_handle->messages[packet]) &&
918          (socket->receiver_window_available 
919           >= ntohs (io_handle->messages[packet]->header.size)))
920   {
921     socket->receiver_window_available -= 
922       ntohs (io_handle->messages[packet]->header.size);
923     LOG (GNUNET_ERROR_TYPE_DEBUG,
924          "%s: Placing DATA message with sequence %u in send queue\n",
925          GNUNET_i2s (&socket->other_peer),
926          ntohl (io_handle->messages[packet]->sequence_number));
927     copy_and_queue_message (socket,
928                             &io_handle->messages[packet]->header,
929                             NULL,
930                             NULL);
931     packet++;
932   }
933   io_handle->packets_sent = packet;
934   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
935   {
936     socket->data_retransmit_timeout = GNUNET_TIME_STD_BACKOFF
937         (socket->data_retransmit_timeout);
938     socket->data_retransmission_task_id = 
939         GNUNET_SCHEDULER_add_delayed (socket->data_retransmit_timeout,
940                                       &data_retransmission_task,
941                                       socket);
942   }
943 }
944
945
946 /**
947  * Cleansup the sockets read handle
948  *
949  * @param socket the socket whose read handle has to be cleanedup
950  */
951 static void
952 cleanup_read_handle (struct GNUNET_STREAM_Socket *socket)
953 {
954   struct GNUNET_STREAM_ReadHandle *read_handle;
955
956   read_handle = socket->read_handle;
957   /* Read io time task should be there; if it is already executed then this
958   read handle is not valid; However upon scheduler shutdown the read io task
959   may be executed before */
960   if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id)
961     GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
962   /* reading task may be present; if so we have to stop it */
963   if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id)
964     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
965   if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id)
966     GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id);
967   GNUNET_free (read_handle);
968   socket->read_handle = NULL;
969 }
970
971
972 /**
973  * Task for calling the read processor
974  *
975  * @param cls the socket
976  * @param tc the task context
977  */
978 static void
979 call_read_processor (void *cls,
980                      const struct GNUNET_SCHEDULER_TaskContext *tc)
981 {
982   struct GNUNET_STREAM_Socket *socket = cls;
983   struct GNUNET_STREAM_ReadHandle *read_handle;
984   GNUNET_STREAM_DataProcessor proc;
985   void *proc_cls;
986   size_t read_size;
987   size_t valid_read_size;
988   unsigned int packet;
989   uint32_t sequence_increase;
990   uint32_t offset_increase;
991
992   read_handle = socket->read_handle;
993   GNUNET_assert (NULL != read_handle);
994   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
995   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
996     return;
997   if (NULL == socket->receive_buffer) 
998     return;
999   GNUNET_assert (NULL != read_handle->proc);
1000   /* Check the bitmap for any holes */
1001   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1002   {
1003     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
1004                                            packet))
1005       break;
1006   }
1007   /* We only call read processor if we have the first packet */
1008   GNUNET_assert (0 < packet);
1009   valid_read_size = 
1010     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
1011   GNUNET_assert (0 != valid_read_size);
1012   proc = read_handle->proc;
1013   proc_cls = read_handle->proc_cls;
1014   cleanup_read_handle (socket);
1015   /* Call the data processor */
1016   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
1017        GNUNET_i2s (&socket->other_peer));
1018   read_size = proc (proc_cls, GNUNET_STREAM_OK,
1019                     socket->receive_buffer + socket->copy_offset,
1020                     valid_read_size);
1021   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
1022        GNUNET_i2s (&socket->other_peer), read_size);
1023   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
1024        GNUNET_i2s (&socket->other_peer));
1025   GNUNET_assert (read_size <= valid_read_size);
1026   socket->copy_offset += read_size;
1027   /* Determine upto which packet we can remove from the buffer */
1028   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1029   {
1030     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1031     { 
1032       packet++; 
1033       break;
1034     }
1035     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1036       break;
1037   }
1038   /* If no packets can be removed we can't move the buffer */
1039   if (0 == packet) 
1040     return;
1041   sequence_increase = packet;
1042   LOG (GNUNET_ERROR_TYPE_DEBUG,
1043        "%s: Sequence increase after read processor completion: %u\n",
1044        GNUNET_i2s (&socket->other_peer), sequence_increase);
1045   /* Shift the data in the receive buffer */
1046   socket->receive_buffer = 
1047     memmove (socket->receive_buffer,
1048              socket->receive_buffer 
1049              + socket->receive_buffer_boundaries[sequence_increase-1],
1050              socket->receive_buffer_size
1051              - socket->receive_buffer_boundaries[sequence_increase-1]);
1052   /* Shift the bitmap */
1053   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1054   /* Set read_sequence_number */
1055   socket->read_sequence_number += sequence_increase;
1056   /* Set read_offset */
1057   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1058   socket->read_offset += offset_increase;
1059   /* Fix copy_offset */
1060   GNUNET_assert (offset_increase <= socket->copy_offset);
1061   socket->copy_offset -= offset_increase;
1062   /* Fix relative boundaries */
1063   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1064   {
1065     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1066     {
1067       uint32_t ahead_buffer_boundary;
1068
1069       ahead_buffer_boundary = 
1070         socket->receive_buffer_boundaries[packet + sequence_increase];
1071       if (0 == ahead_buffer_boundary)
1072         socket->receive_buffer_boundaries[packet] = 0;
1073       else
1074       {
1075         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1076         socket->receive_buffer_boundaries[packet] = 
1077           ahead_buffer_boundary - offset_increase;
1078       }
1079     }
1080     else
1081       socket->receive_buffer_boundaries[packet] = 0;
1082   }
1083 }
1084
1085
1086 /**
1087  * Cancels the existing read io handle
1088  *
1089  * @param cls the closure from the SCHEDULER call
1090  * @param tc the task context
1091  */
1092 static void
1093 read_io_timeout (void *cls,
1094                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1095 {
1096   struct GNUNET_STREAM_Socket *socket = cls;
1097   struct GNUNET_STREAM_ReadHandle *read_handle;
1098   GNUNET_STREAM_DataProcessor proc;
1099   void *proc_cls;
1100
1101   read_handle = socket->read_handle;
1102   GNUNET_assert (NULL != read_handle);
1103   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1104   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1105     return;
1106   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1107   {
1108     LOG (GNUNET_ERROR_TYPE_DEBUG,
1109          "%s: Read task timedout - Cancelling it\n",
1110          GNUNET_i2s (&socket->other_peer));
1111     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1112     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1113   }
1114   proc = read_handle->proc;
1115   proc_cls = read_handle->proc_cls;
1116   GNUNET_free (read_handle);
1117   socket->read_handle = NULL;
1118   /* Call the read processor to signal timeout */
1119   proc (proc_cls,
1120         GNUNET_STREAM_TIMEOUT,
1121         NULL,
1122         0);
1123 }
1124
1125
1126 /**
1127  * Handler for DATA messages; Same for both client and server
1128  *
1129  * @param socket the socket through which the ack was received
1130  * @param tunnel connection to the other end
1131  * @param sender who sent the message
1132  * @param msg the data message
1133  * @param atsi performance data for the connection
1134  * @return GNUNET_OK to keep the connection open,
1135  *         GNUNET_SYSERR to close it (signal serious error)
1136  */
1137 static int
1138 handle_data (struct GNUNET_STREAM_Socket *socket,
1139              struct GNUNET_MESH_Tunnel *tunnel,
1140              const struct GNUNET_PeerIdentity *sender,
1141              const struct GNUNET_STREAM_DataMessage *msg,
1142              const struct GNUNET_ATS_Information*atsi)
1143 {
1144   const void *payload;
1145   struct GNUNET_TIME_Relative ack_deadline_rel;
1146   uint32_t bytes_needed;
1147   uint32_t relative_offset;
1148   uint32_t relative_sequence_number;
1149   uint16_t size;
1150
1151   size = htons (msg->header.size);
1152   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1153   {
1154     GNUNET_break_op (0);
1155     return GNUNET_SYSERR;
1156   }
1157   if (0 != memcmp (sender, &socket->other_peer,
1158                    sizeof (struct GNUNET_PeerIdentity)))
1159   {
1160     LOG (GNUNET_ERROR_TYPE_DEBUG,
1161          "%s: Received DATA from non-confirming peer\n",
1162          GNUNET_i2s (&socket->other_peer));
1163     return GNUNET_YES;
1164   }
1165   switch (socket->state)
1166   {
1167   case STATE_ESTABLISHED:
1168   case STATE_TRANSMIT_CLOSED:
1169   case STATE_TRANSMIT_CLOSE_WAIT:      
1170     /* check if the message's sequence number is in the range we are
1171        expecting */
1172     relative_sequence_number = 
1173       ntohl (msg->sequence_number) - socket->read_sequence_number;
1174     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1175     {
1176       LOG (GNUNET_ERROR_TYPE_DEBUG,
1177            "%s: Ignoring received message with sequence number %u\n",
1178            GNUNET_i2s (&socket->other_peer),
1179            ntohl (msg->sequence_number));
1180       /* Start ACK sending task if one is not already present */
1181       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1182       {
1183         socket->ack_task_id = 
1184           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1185                                         (msg->ack_deadline),
1186                                         &ack_task,
1187                                         socket);
1188       }
1189       return GNUNET_YES;
1190     }      
1191     /* Check if we have already seen this message */
1192     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1193                                             relative_sequence_number))
1194     {
1195       LOG (GNUNET_ERROR_TYPE_DEBUG,
1196            "%s: Ignoring already received message with sequence number %u\n",
1197            GNUNET_i2s (&socket->other_peer),
1198            ntohl (msg->sequence_number));
1199       /* Start ACK sending task if one is not already present */
1200       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1201       {
1202         socket->ack_task_id = 
1203           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1204                                         (msg->ack_deadline), &ack_task, socket);
1205       }
1206       return GNUNET_YES;
1207     }
1208     LOG (GNUNET_ERROR_TYPE_DEBUG,
1209          "%1$s: Receiving DATA with sequence number: %2$u and size: %3$d from "
1210          "%1$s\n", GNUNET_i2s (&socket->other_peer),
1211          ntohl (msg->sequence_number), ntohs (msg->header.size));
1212     /* Check if we have to allocate the buffer */
1213     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1214     relative_offset = ntohl (msg->offset) - socket->read_offset;
1215     bytes_needed = relative_offset + size;
1216     if (bytes_needed > socket->receive_buffer_size)
1217     {
1218       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1219       {
1220         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1221                                                  bytes_needed);
1222         socket->receive_buffer_size = bytes_needed;
1223       }
1224       else
1225       {
1226         LOG (GNUNET_ERROR_TYPE_DEBUG,
1227              "%s: Cannot accommodate packet %d as buffer is full\n",
1228              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1229         return GNUNET_YES;
1230       }
1231     }
1232     /* Copy Data to buffer */
1233     payload = &msg[1];
1234     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1235     memcpy (socket->receive_buffer + relative_offset, payload, size);
1236     socket->receive_buffer_boundaries[relative_sequence_number] = 
1237         relative_offset + size;
1238     /* Modify the ACK bitmap */
1239     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1240                           GNUNET_YES);
1241     /* Start ACK sending task if one is not already present */
1242     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1243     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1244     {
1245       ack_deadline_rel = 
1246           GNUNET_TIME_relative_min (ack_deadline_rel,
1247                                     GNUNET_TIME_relative_multiply
1248                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1249       socket->ack_task_id = 
1250           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1251                                         (msg->ack_deadline), &ack_task, socket);
1252       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1253       socket->ack_time_deadline = ack_deadline_rel;
1254     }
1255     else
1256     {
1257       struct GNUNET_TIME_Relative ack_time_past;
1258       struct GNUNET_TIME_Relative ack_time_remaining;
1259       struct GNUNET_TIME_Relative ack_time_min;
1260       ack_time_past = 
1261           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1262       ack_time_remaining = GNUNET_TIME_relative_subtract
1263           (socket->ack_time_deadline, ack_time_past);
1264       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1265                                                ack_deadline_rel);
1266       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1267                       sizeof (struct GNUNET_TIME_Relative)))
1268       {
1269         ack_deadline_rel = ack_time_min;
1270         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1271         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1272                                                             &ack_task, socket);
1273         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1274         socket->ack_time_deadline = ack_deadline_rel;
1275       }
1276     }
1277     if ((NULL != socket->read_handle) /* A read handle is waiting */
1278         /* There is no current read task */
1279         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1280         /* We have the first packet */
1281         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1282     {
1283       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1284            GNUNET_i2s (&socket->other_peer));
1285       socket->read_handle->read_task_id =
1286           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1287     }
1288     break;
1289   default:
1290     LOG (GNUNET_ERROR_TYPE_DEBUG,
1291          "%s: Received data message when it cannot be handled\n",
1292          GNUNET_i2s (&socket->other_peer));
1293     break;
1294   }
1295   return GNUNET_YES;
1296 }
1297
1298
1299 /**
1300  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1301  *
1302  * @param cls the socket (set from GNUNET_MESH_connect)
1303  * @param tunnel connection to the other end
1304  * @param tunnel_ctx place to store local state associated with the tunnel
1305  * @param sender who sent the message
1306  * @param message the actual message
1307  * @param atsi performance data for the connection
1308  * @return GNUNET_OK to keep the connection open,
1309  *         GNUNET_SYSERR to close it (signal serious error)
1310  */
1311 static int
1312 client_handle_data (void *cls,
1313                     struct GNUNET_MESH_Tunnel *tunnel,
1314                     void **tunnel_ctx,
1315                     const struct GNUNET_PeerIdentity *sender,
1316                     const struct GNUNET_MessageHeader *message,
1317                     const struct GNUNET_ATS_Information*atsi)
1318 {
1319   struct GNUNET_STREAM_Socket *socket = cls;
1320
1321   return handle_data (socket, tunnel, sender, 
1322                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1323 }
1324
1325
1326 /**
1327  * Callback to set state to ESTABLISHED
1328  *
1329  * @param cls the closure NULL;
1330  * @param socket the socket to requiring state change
1331  */
1332 static void
1333 set_state_established (void *cls,
1334                        struct GNUNET_STREAM_Socket *socket)
1335 {
1336   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1337        "%s: Attaining ESTABLISHED state\n",
1338        GNUNET_i2s (&socket->other_peer));
1339   socket->write_offset = 0;
1340   socket->read_offset = 0;
1341   socket->state = STATE_ESTABLISHED;
1342   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1343                  socket->control_retransmission_task_id);
1344   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1345   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1346   if (NULL != socket->lsocket)
1347   {
1348     LOG (GNUNET_ERROR_TYPE_DEBUG,
1349          "%s: Calling listen callback\n",
1350          GNUNET_i2s (&socket->other_peer));
1351     if (GNUNET_SYSERR == 
1352         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1353                                     socket,
1354                                     &socket->other_peer))
1355     {
1356       socket->state = STATE_CLOSED;
1357       /* FIXME: We should close in a decent way (send RST) */
1358       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1359       GNUNET_free (socket);
1360     }
1361   }
1362   else
1363     socket->open_cb (socket->open_cls, socket);
1364 }
1365
1366
1367 /**
1368  * Callback to set state to HELLO_WAIT
1369  *
1370  * @param cls the closure from queue_message
1371  * @param socket the socket to requiring state change
1372  */
1373 static void
1374 set_state_hello_wait (void *cls,
1375                       struct GNUNET_STREAM_Socket *socket)
1376 {
1377   GNUNET_assert (STATE_INIT == socket->state);
1378   LOG (GNUNET_ERROR_TYPE_DEBUG,
1379        "%s: Attaining HELLO_WAIT state\n",
1380        GNUNET_i2s (&socket->other_peer));
1381   socket->state = STATE_HELLO_WAIT;
1382 }
1383
1384
1385 /**
1386  * Callback to set state to CLOSE_WAIT
1387  *
1388  * @param cls the closure from queue_message
1389  * @param socket the socket requiring state change
1390  */
1391 static void
1392 set_state_close_wait (void *cls,
1393                       struct GNUNET_STREAM_Socket *socket)
1394 {
1395   LOG (GNUNET_ERROR_TYPE_DEBUG,
1396        "%s: Attaing CLOSE_WAIT state\n",
1397        GNUNET_i2s (&socket->other_peer));
1398   socket->state = STATE_CLOSE_WAIT;
1399   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1400   socket->receive_buffer = NULL;
1401   socket->receive_buffer_size = 0;
1402 }
1403
1404
1405 /**
1406  * Callback to set state to RECEIVE_CLOSE_WAIT
1407  *
1408  * @param cls the closure from queue_message
1409  * @param socket the socket requiring state change
1410  */
1411 static void
1412 set_state_receive_close_wait (void *cls,
1413                               struct GNUNET_STREAM_Socket *socket)
1414 {
1415   LOG (GNUNET_ERROR_TYPE_DEBUG,
1416        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1417        GNUNET_i2s (&socket->other_peer));
1418   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1419   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1420   socket->receive_buffer = NULL;
1421   socket->receive_buffer_size = 0;
1422 }
1423
1424
1425 /**
1426  * Callback to set state to TRANSMIT_CLOSE_WAIT
1427  *
1428  * @param cls the closure from queue_message
1429  * @param socket the socket requiring state change
1430  */
1431 static void
1432 set_state_transmit_close_wait (void *cls,
1433                                struct GNUNET_STREAM_Socket *socket)
1434 {
1435   LOG (GNUNET_ERROR_TYPE_DEBUG,
1436        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1437        GNUNET_i2s (&socket->other_peer));
1438   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1439 }
1440
1441
1442 /**
1443  * Callback to set state to CLOSED
1444  *
1445  * @param cls the closure from queue_message
1446  * @param socket the socket requiring state change
1447  */
1448 static void
1449 set_state_closed (void *cls,
1450                   struct GNUNET_STREAM_Socket *socket)
1451 {
1452   socket->state = STATE_CLOSED;
1453 }
1454
1455
1456 /**
1457  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1458  *
1459  * @return the generate hello message
1460  */
1461 static struct GNUNET_MessageHeader *
1462 generate_hello (struct GNUNET_STREAM_Socket *socket)
1463 {
1464   struct GNUNET_STREAM_HelloMessage *msg;
1465
1466   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloMessage));
1467   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1468   msg->header.size = htons (sizeof (struct GNUNET_STREAM_HelloMessage));
1469   msg->port = GNUNET_htonll ((uint64_t) socket->port);
1470   return &msg->header;
1471 }
1472
1473
1474 /**
1475  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1476  * socket
1477  *
1478  * @param socket the socket for which this HelloAckMessage has to be generated
1479  * @param generate_seq GNUNET_YES to generate the write sequence number,
1480  *          GNUNET_NO to use the existing sequence number
1481  * @return the HelloAckMessage
1482  */
1483 static struct GNUNET_STREAM_HelloAckMessage *
1484 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1485                     int generate_seq)
1486 {
1487   struct GNUNET_STREAM_HelloAckMessage *msg;
1488
1489   if (GNUNET_YES == generate_seq)
1490   {
1491     if (GNUNET_YES == socket->testing_active)
1492       socket->write_sequence_number =
1493         socket->testing_set_write_sequence_number_value;
1494     else
1495       socket->write_sequence_number = 
1496         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1497     LOG_DEBUG ("%s: write sequence number %u\n",
1498                GNUNET_i2s (&socket->other_peer),
1499                (unsigned int) socket->write_sequence_number);
1500   }
1501   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1502   msg->header.size = 
1503     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1504   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1505   msg->sequence_number = htonl (socket->write_sequence_number);
1506   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1507   return msg;
1508 }
1509
1510
1511 /**
1512  * Task for retransmitting control messages if they aren't ACK'ed before a
1513  * deadline
1514  *
1515  * @param cls the socket
1516  * @param tc the Task context
1517  */
1518 static void
1519 control_retransmission_task (void *cls,
1520                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1521 {
1522   struct GNUNET_STREAM_Socket *socket = cls;
1523     
1524   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1525   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1526     return;
1527   LOG_DEBUG ("%s: Retransmitting a control message\n",
1528                  GNUNET_i2s (&socket->other_peer));
1529   switch (socket->state)
1530   {
1531   case STATE_INIT:    
1532     GNUNET_break (0);
1533     break;
1534   case STATE_LISTEN:
1535     GNUNET_break (0);
1536     break;
1537   case STATE_HELLO_WAIT:
1538     if (NULL == socket->lsocket) /* We are client */
1539       queue_message (socket, generate_hello (socket), NULL, NULL, GNUNET_NO);
1540     else
1541       queue_message (socket,
1542                      (struct GNUNET_MessageHeader *)
1543                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1544                      GNUNET_NO);
1545     socket->control_retransmission_task_id =
1546     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1547                                   &control_retransmission_task, socket);
1548     break;
1549   case STATE_ESTABLISHED:
1550     if (NULL == socket->lsocket)
1551       queue_message (socket,
1552                      (struct GNUNET_MessageHeader *)
1553                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1554                      GNUNET_NO);
1555     else
1556       GNUNET_break (0);
1557     break;
1558   default:
1559     GNUNET_break (0);
1560   }  
1561 }
1562
1563
1564 /**
1565  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1566  *
1567  * @param cls the socket (set from GNUNET_MESH_connect)
1568  * @param tunnel connection to the other end
1569  * @param tunnel_ctx this is NULL
1570  * @param sender who sent the message
1571  * @param message the actual message
1572  * @param atsi performance data for the connection
1573  * @return GNUNET_OK to keep the connection open,
1574  *         GNUNET_SYSERR to close it (signal serious error)
1575  */
1576 static int
1577 client_handle_hello_ack (void *cls,
1578                          struct GNUNET_MESH_Tunnel *tunnel,
1579                          void **tunnel_ctx,
1580                          const struct GNUNET_PeerIdentity *sender,
1581                          const struct GNUNET_MessageHeader *message,
1582                          const struct GNUNET_ATS_Information*atsi)
1583 {
1584   struct GNUNET_STREAM_Socket *socket = cls;
1585   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1586   struct GNUNET_STREAM_HelloAckMessage *reply;
1587
1588   if (0 != memcmp (sender, &socket->other_peer,
1589                    sizeof (struct GNUNET_PeerIdentity)))
1590   {
1591     LOG (GNUNET_ERROR_TYPE_DEBUG,
1592          "%s: Received HELLO_ACK from non-confirming peer\n",
1593          GNUNET_i2s (&socket->other_peer));
1594     return GNUNET_YES;
1595   }
1596   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1597   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1598        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1599   GNUNET_assert (socket->tunnel == tunnel);
1600   switch (socket->state)
1601   {
1602   case STATE_HELLO_WAIT:
1603     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1604     LOG_DEBUG ("%s: Read sequence number %u\n", 
1605                GNUNET_i2s (&socket->other_peer), socket->read_sequence_number);
1606     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1607     reply = generate_hello_ack (socket, GNUNET_YES);
1608     queue_message (socket, &reply->header, &set_state_established,
1609                    NULL, GNUNET_NO);    
1610     return GNUNET_OK;
1611   case STATE_ESTABLISHED:
1612     // call statistics (# ACKs ignored++)
1613     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1614                    socket->control_retransmission_task_id);
1615     socket->control_retransmission_task_id =
1616       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1617     return GNUNET_OK;
1618   default:
1619     LOG_DEBUG ("%1$s: Server %1$s sent HELLO_ACK when in state %2$d\n", 
1620                GNUNET_i2s (&socket->other_peer), socket->state);
1621     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1622     return GNUNET_SYSERR;
1623   }
1624 }
1625
1626
1627 /**
1628  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1629  *
1630  * @param cls the socket (set from GNUNET_MESH_connect)
1631  * @param tunnel connection to the other end
1632  * @param tunnel_ctx this is NULL
1633  * @param sender who sent the message
1634  * @param message the actual message
1635  * @param atsi performance data for the connection
1636  * @return GNUNET_OK to keep the connection open,
1637  *         GNUNET_SYSERR to close it (signal serious error)
1638  */
1639 static int
1640 client_handle_reset (void *cls,
1641                      struct GNUNET_MESH_Tunnel *tunnel,
1642                      void **tunnel_ctx,
1643                      const struct GNUNET_PeerIdentity *sender,
1644                      const struct GNUNET_MessageHeader *message,
1645                      const struct GNUNET_ATS_Information*atsi)
1646 {
1647   // struct GNUNET_STREAM_Socket *socket = cls;
1648
1649   return GNUNET_OK;
1650 }
1651
1652
1653 /**
1654  * Frees the socket's receive buffers, marks the socket as receive closed and
1655  * calls the DataProcessor with GNUNET_STREAM_SHUTDOWN status if a read handle
1656  * is present
1657  *
1658  * @param socket the socket
1659  */
1660 static void
1661 do_receive_shutdown (struct GNUNET_STREAM_Socket *socket)
1662 {
1663   socket->receive_closed = GNUNET_YES;  
1664   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1665   socket->receive_buffer = NULL;
1666   socket->receive_buffer_size = 0;
1667   if (NULL != socket->read_handle)
1668   {
1669     GNUNET_STREAM_DataProcessor proc;
1670     void *proc_cls;
1671
1672     proc = socket->read_handle->proc;
1673     proc_cls = socket->read_handle->proc_cls;
1674     GNUNET_STREAM_read_cancel (socket->read_handle);
1675     socket->read_handle = NULL;
1676     if (NULL != proc)
1677       proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
1678   }
1679 }
1680
1681
1682 /**
1683  * Marks the socket as transmit closed and calls the CompletionContinuation with
1684  * GNUNET_STREAM_SHUTDOWN status if a write handle is present
1685  *
1686  * @param socket the socket
1687  */
1688 static void
1689 do_transmit_shutdown (struct GNUNET_STREAM_Socket *socket)
1690 {
1691   socket->transmit_closed = GNUNET_YES;
1692   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1693      that that stream has been shutdown */
1694   if (NULL != socket->write_handle)
1695   {
1696     GNUNET_STREAM_CompletionContinuation wc;
1697     void *wc_cls;
1698
1699     wc = socket->write_handle->write_cont;
1700     wc_cls = socket->write_handle->write_cont_cls;
1701     GNUNET_STREAM_write_cancel (socket->write_handle);
1702     socket->write_handle = NULL;
1703     if (NULL != wc)
1704       wc (wc_cls,
1705           GNUNET_STREAM_SHUTDOWN, 0);
1706   }
1707 }
1708
1709
1710 /**
1711  * Common message handler for handling TRANSMIT_CLOSE messages
1712  *
1713  * @param socket the socket through which the ack was received
1714  * @param tunnel connection to the other end
1715  * @param sender who sent the message
1716  * @param msg the transmit close message
1717  * @param atsi performance data for the connection
1718  * @return GNUNET_OK to keep the connection open,
1719  *         GNUNET_SYSERR to close it (signal serious error)
1720  */
1721 static int
1722 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1723                        struct GNUNET_MESH_Tunnel *tunnel,
1724                        const struct GNUNET_PeerIdentity *sender,
1725                        const struct GNUNET_MessageHeader *msg,
1726                        const struct GNUNET_ATS_Information*atsi)
1727 {
1728   struct GNUNET_MessageHeader *reply;
1729
1730   switch (socket->state)
1731   {
1732   case STATE_INIT:
1733   case STATE_LISTEN:
1734   case STATE_HELLO_WAIT:
1735     LOG (GNUNET_ERROR_TYPE_DEBUG,
1736          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1737          GNUNET_i2s (&socket->other_peer));
1738     return GNUNET_OK;
1739   default:
1740     break;
1741   }
1742   /* Send TRANSMIT_CLOSE_ACK */
1743   reply = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
1744   reply->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1745   reply->size = htons (sizeof (struct GNUNET_MessageHeader));
1746   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1747   LOG (GNUNET_ERROR_TYPE_DEBUG, "%1$s: Received TRANSMIT_CLOSE from %1$s\n",
1748        GNUNET_i2s (&socket->other_peer));
1749   switch(socket->state)
1750   {
1751   case STATE_RECEIVE_CLOSED:
1752   case STATE_RECEIVE_CLOSE_WAIT:
1753   case STATE_CLOSE_WAIT:
1754   case STATE_CLOSED:
1755     return GNUNET_OK;
1756   default:
1757     break;
1758   }
1759   do_receive_shutdown (socket);
1760   if (GNUNET_YES == socket->transmit_closed)
1761     socket->state = STATE_CLOSED;
1762   else
1763     socket->state = STATE_RECEIVE_CLOSED;
1764   return GNUNET_OK;
1765 }
1766
1767
1768 /**
1769  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1770  *
1771  * @param cls the socket (set from GNUNET_MESH_connect)
1772  * @param tunnel connection to the other end
1773  * @param tunnel_ctx this is NULL
1774  * @param sender who sent the message
1775  * @param message the actual message
1776  * @param atsi performance data for the connection
1777  * @return GNUNET_OK to keep the connection open,
1778  *         GNUNET_SYSERR to close it (signal serious error)
1779  */
1780 static int
1781 client_handle_transmit_close (void *cls,
1782                               struct GNUNET_MESH_Tunnel *tunnel,
1783                               void **tunnel_ctx,
1784                               const struct GNUNET_PeerIdentity *sender,
1785                               const struct GNUNET_MessageHeader *message,
1786                               const struct GNUNET_ATS_Information*atsi)
1787 {
1788   struct GNUNET_STREAM_Socket *socket = cls;
1789   
1790   return handle_transmit_close (socket,
1791                                 tunnel,
1792                                 sender,
1793                                 (struct GNUNET_MessageHeader *)message,
1794                                 atsi);
1795 }
1796
1797
1798 /**
1799  * Task for calling the shutdown continuation callback
1800  *
1801  * @param cls the socket
1802  * @param tc the scheduler task context
1803  */
1804 static void
1805 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1806 {
1807   struct GNUNET_STREAM_Socket *socket = cls;
1808   
1809   GNUNET_assert (NULL != socket->shutdown_handle);
1810   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1811   if (NULL != socket->shutdown_handle->completion_cb)
1812     socket->shutdown_handle->completion_cb
1813         (socket->shutdown_handle->completion_cls,
1814          socket->shutdown_handle->operation);
1815   GNUNET_free (socket->shutdown_handle);
1816   socket->shutdown_handle = NULL;
1817 }
1818
1819
1820 /**
1821  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1822  *
1823  * @param socket the socket
1824  * @param tunnel connection to the other end
1825  * @param sender who sent the message
1826  * @param message the actual message
1827  * @param atsi performance data for the connection
1828  * @param operation the close operation which is being ACK'ed
1829  * @return GNUNET_OK to keep the connection open,
1830  *         GNUNET_SYSERR to close it (signal serious error)
1831  */
1832 static int
1833 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1834                           struct GNUNET_MESH_Tunnel *tunnel,
1835                           const struct GNUNET_PeerIdentity *sender,
1836                           const struct GNUNET_MessageHeader *message,
1837                           const struct GNUNET_ATS_Information *atsi,
1838                           int operation)
1839 {
1840   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1841
1842   shutdown_handle = socket->shutdown_handle;
1843   if (NULL == shutdown_handle)
1844   {
1845     /* This may happen when the shudown handle is cancelled */
1846     LOG (GNUNET_ERROR_TYPE_DEBUG,
1847          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1848          GNUNET_i2s (&socket->other_peer));
1849     return GNUNET_OK;
1850   }
1851   switch (operation)
1852   {
1853   case SHUT_RDWR:
1854     switch (socket->state)
1855     {
1856     case STATE_CLOSE_WAIT:
1857       if (SHUT_RDWR != shutdown_handle->operation)
1858       {
1859         LOG (GNUNET_ERROR_TYPE_DEBUG,
1860              "%s: Received CLOSE_ACK when shutdown handle is not for "
1861              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1862         return GNUNET_OK;
1863       }
1864       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1865            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1866       socket->state = STATE_CLOSED;
1867       break;
1868     default:
1869       LOG (GNUNET_ERROR_TYPE_DEBUG,
1870            "%s: Received CLOSE_ACK when it is not expected\n",
1871            GNUNET_i2s (&socket->other_peer));
1872       return GNUNET_OK;
1873     }
1874     break;
1875   case SHUT_RD:
1876     switch (socket->state)
1877     {
1878     case STATE_RECEIVE_CLOSE_WAIT:
1879       if (SHUT_RD != shutdown_handle->operation)
1880       {
1881         LOG (GNUNET_ERROR_TYPE_DEBUG,
1882              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1883              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1884         return GNUNET_OK;
1885       }
1886       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1887            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1888       socket->state = STATE_RECEIVE_CLOSED;
1889       break;
1890     default:
1891       LOG (GNUNET_ERROR_TYPE_DEBUG,
1892            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1893            GNUNET_i2s (&socket->other_peer));
1894       return GNUNET_OK;
1895     }
1896     break;
1897   case SHUT_WR:
1898     switch (socket->state)
1899     {
1900     case STATE_TRANSMIT_CLOSE_WAIT:
1901       if (SHUT_WR != shutdown_handle->operation)
1902       {
1903         LOG (GNUNET_ERROR_TYPE_DEBUG,
1904              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1905              "is not for SHUT_WR\n",
1906              GNUNET_i2s (&socket->other_peer));
1907         return GNUNET_OK;
1908       }
1909       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1910            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1911       socket->state = STATE_TRANSMIT_CLOSED;
1912       break;
1913     default:
1914       LOG (GNUNET_ERROR_TYPE_DEBUG,
1915            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1916            GNUNET_i2s (&socket->other_peer));          
1917       return GNUNET_OK;
1918     }
1919     break;
1920   default:
1921     GNUNET_assert (0);
1922   }
1923   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1924       (&call_cont_task, socket);
1925   if (GNUNET_SCHEDULER_NO_TASK
1926       != shutdown_handle->close_msg_retransmission_task_id)
1927   {
1928     GNUNET_SCHEDULER_cancel
1929       (shutdown_handle->close_msg_retransmission_task_id);
1930     shutdown_handle->close_msg_retransmission_task_id =
1931         GNUNET_SCHEDULER_NO_TASK;
1932   }
1933   return GNUNET_OK;
1934 }
1935
1936
1937 /**
1938  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1939  *
1940  * @param cls the socket (set from GNUNET_MESH_connect)
1941  * @param tunnel connection to the other end
1942  * @param tunnel_ctx this is NULL
1943  * @param sender who sent the message
1944  * @param message the actual message
1945  * @param atsi performance data for the connection
1946  * @return GNUNET_OK to keep the connection open,
1947  *         GNUNET_SYSERR to close it (signal serious error)
1948  */
1949 static int
1950 client_handle_transmit_close_ack (void *cls,
1951                                   struct GNUNET_MESH_Tunnel *tunnel,
1952                                   void **tunnel_ctx,
1953                                   const struct GNUNET_PeerIdentity *sender,
1954                                   const struct GNUNET_MessageHeader *message,
1955                                   const struct GNUNET_ATS_Information*atsi)
1956 {
1957   struct GNUNET_STREAM_Socket *socket = cls;
1958
1959   return handle_generic_close_ack (socket,
1960                                    tunnel,
1961                                    sender,
1962                                    (const struct GNUNET_MessageHeader *)
1963                                    message,
1964                                    atsi,
1965                                    SHUT_WR);
1966 }
1967
1968
1969 /**
1970  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1971  *
1972  * @param socket the socket
1973  * @param tunnel connection to the other end
1974  * @param sender who sent the message
1975  * @param message the actual message
1976  * @param atsi performance data for the connection
1977  * @return GNUNET_OK to keep the connection open,
1978  *         GNUNET_SYSERR to close it (signal serious error)
1979  */
1980 static int
1981 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1982                       struct GNUNET_MESH_Tunnel *tunnel,
1983                       const struct GNUNET_PeerIdentity *sender,
1984                       const struct GNUNET_MessageHeader *message,
1985                       const struct GNUNET_ATS_Information *atsi)
1986 {
1987   struct GNUNET_MessageHeader *receive_close_ack;
1988
1989   switch (socket->state)
1990   {
1991   case STATE_INIT:
1992   case STATE_LISTEN:
1993   case STATE_HELLO_WAIT:
1994     LOG (GNUNET_ERROR_TYPE_DEBUG,
1995          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1996          GNUNET_i2s (&socket->other_peer));
1997     return GNUNET_OK;
1998   default:
1999     break;
2000   }  
2001   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
2002        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));  
2003   receive_close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
2004   receive_close_ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2005   receive_close_ack->type =
2006       htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
2007   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
2008   switch (socket->state)
2009   {
2010   case STATE_TRANSMIT_CLOSED:
2011   case STATE_TRANSMIT_CLOSE_WAIT:
2012   case STATE_CLOSED:
2013   case STATE_CLOSE_WAIT:
2014     return GNUNET_OK;
2015   default:
2016     break;
2017   }
2018   do_transmit_shutdown (socket);
2019   if (GNUNET_YES == socket->receive_closed)
2020     socket->state = STATE_CLOSED;
2021   else
2022     socket->state = STATE_TRANSMIT_CLOSED;
2023   return GNUNET_OK;
2024 }
2025
2026
2027 /**
2028  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2029  *
2030  * @param cls the socket (set from GNUNET_MESH_connect)
2031  * @param tunnel connection to the other end
2032  * @param tunnel_ctx this is NULL
2033  * @param sender who sent the message
2034  * @param message the actual message
2035  * @param atsi performance data for the connection
2036  * @return GNUNET_OK to keep the connection open,
2037  *         GNUNET_SYSERR to close it (signal serious error)
2038  */
2039 static int
2040 client_handle_receive_close (void *cls,
2041                              struct GNUNET_MESH_Tunnel *tunnel,
2042                              void **tunnel_ctx,
2043                              const struct GNUNET_PeerIdentity *sender,
2044                              const struct GNUNET_MessageHeader *message,
2045                              const struct GNUNET_ATS_Information*atsi)
2046 {
2047   struct GNUNET_STREAM_Socket *socket = cls;
2048
2049   return
2050     handle_receive_close (socket,
2051                           tunnel,
2052                           sender,
2053                           (const struct GNUNET_MessageHeader *) message,
2054                           atsi);
2055 }
2056
2057
2058 /**
2059  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2060  *
2061  * @param cls the socket (set from GNUNET_MESH_connect)
2062  * @param tunnel connection to the other end
2063  * @param tunnel_ctx this is NULL
2064  * @param sender who sent the message
2065  * @param message the actual message
2066  * @param atsi performance data for the connection
2067  * @return GNUNET_OK to keep the connection open,
2068  *         GNUNET_SYSERR to close it (signal serious error)
2069  */
2070 static int
2071 client_handle_receive_close_ack (void *cls,
2072                                  struct GNUNET_MESH_Tunnel *tunnel,
2073                                  void **tunnel_ctx,
2074                                  const struct GNUNET_PeerIdentity *sender,
2075                                  const struct GNUNET_MessageHeader *message,
2076                                  const struct GNUNET_ATS_Information*atsi)
2077 {
2078   struct GNUNET_STREAM_Socket *socket = cls;
2079
2080   return handle_generic_close_ack (socket,
2081                                    tunnel,
2082                                    sender,
2083                                    (const struct GNUNET_MessageHeader *)
2084                                    message,
2085                                    atsi,
2086                                    SHUT_RD);
2087 }
2088
2089
2090 /**
2091  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2092  *
2093  * @param socket the socket
2094  * @param tunnel connection to the other end
2095  * @param sender who sent the message
2096  * @param message the actual message
2097  * @param atsi performance data for the connection
2098  * @return GNUNET_OK to keep the connection open,
2099  *         GNUNET_SYSERR to close it (signal serious error)
2100  */
2101 static int
2102 handle_close (struct GNUNET_STREAM_Socket *socket,
2103               struct GNUNET_MESH_Tunnel *tunnel,
2104               const struct GNUNET_PeerIdentity *sender,
2105               const struct GNUNET_MessageHeader *message,
2106               const struct GNUNET_ATS_Information*atsi)
2107 {
2108   struct GNUNET_MessageHeader *close_ack;
2109
2110   switch (socket->state)
2111   {
2112   case STATE_INIT:
2113   case STATE_LISTEN:
2114   case STATE_HELLO_WAIT:
2115     LOG (GNUNET_ERROR_TYPE_DEBUG,
2116          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2117          GNUNET_i2s (&socket->other_peer));
2118     return GNUNET_OK;
2119   default:
2120     break;
2121   }
2122   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2123        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2124   close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
2125   close_ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2126   close_ack->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2127   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2128   if ((STATE_CLOSED == socket->state) || (STATE_CLOSE_WAIT == socket->state))
2129     return GNUNET_OK;
2130   if (GNUNET_NO == socket->transmit_closed)
2131     do_transmit_shutdown (socket);
2132   if (GNUNET_NO == socket->receive_closed)
2133     do_receive_shutdown (socket);
2134   return GNUNET_OK;
2135 }
2136
2137
2138 /**
2139  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2140  *
2141  * @param cls the socket (set from GNUNET_MESH_connect)
2142  * @param tunnel connection to the other end
2143  * @param tunnel_ctx this is NULL
2144  * @param sender who sent the message
2145  * @param message the actual message
2146  * @param atsi performance data for the connection
2147  * @return GNUNET_OK to keep the connection open,
2148  *         GNUNET_SYSERR to close it (signal serious error)
2149  */
2150 static int
2151 client_handle_close (void *cls,
2152                      struct GNUNET_MESH_Tunnel *tunnel,
2153                      void **tunnel_ctx,
2154                      const struct GNUNET_PeerIdentity *sender,
2155                      const struct GNUNET_MessageHeader *message,
2156                      const struct GNUNET_ATS_Information*atsi)
2157 {
2158   struct GNUNET_STREAM_Socket *socket = cls;
2159
2160   return handle_close (socket,
2161                        tunnel,
2162                        sender,
2163                        (const struct GNUNET_MessageHeader *) message,
2164                        atsi);
2165 }
2166
2167
2168 /**
2169  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2170  *
2171  * @param cls the socket (set from GNUNET_MESH_connect)
2172  * @param tunnel connection to the other end
2173  * @param tunnel_ctx this is NULL
2174  * @param sender who sent the message
2175  * @param message the actual message
2176  * @param atsi performance data for the connection
2177  * @return GNUNET_OK to keep the connection open,
2178  *         GNUNET_SYSERR to close it (signal serious error)
2179  */
2180 static int
2181 client_handle_close_ack (void *cls,
2182                          struct GNUNET_MESH_Tunnel *tunnel,
2183                          void **tunnel_ctx,
2184                          const struct GNUNET_PeerIdentity *sender,
2185                          const struct GNUNET_MessageHeader *message,
2186                          const struct GNUNET_ATS_Information *atsi)
2187 {
2188   struct GNUNET_STREAM_Socket *socket = cls;
2189
2190   return handle_generic_close_ack (socket,
2191                                    tunnel,
2192                                    sender,
2193                                    (const struct GNUNET_MessageHeader *) 
2194                                    message,
2195                                    atsi,
2196                                    SHUT_RDWR);
2197 }
2198
2199 /*****************************/
2200 /* Server's Message Handlers */
2201 /*****************************/
2202
2203 /**
2204  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2205  *
2206  * @param cls the closure
2207  * @param tunnel connection to the other end
2208  * @param tunnel_ctx the socket
2209  * @param sender who sent the message
2210  * @param message the actual message
2211  * @param atsi performance data for the connection
2212  * @return GNUNET_OK to keep the connection open,
2213  *         GNUNET_SYSERR to close it (signal serious error)
2214  */
2215 static int
2216 server_handle_data (void *cls,
2217                     struct GNUNET_MESH_Tunnel *tunnel,
2218                     void **tunnel_ctx,
2219                     const struct GNUNET_PeerIdentity *sender,
2220                     const struct GNUNET_MessageHeader *message,
2221                     const struct GNUNET_ATS_Information*atsi)
2222 {
2223   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2224
2225   return handle_data (socket,
2226                       tunnel,
2227                       sender,
2228                       (const struct GNUNET_STREAM_DataMessage *)message,
2229                       atsi);
2230 }
2231
2232
2233 /**
2234  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2235  *
2236  * @param cls the closure
2237  * @param tunnel connection to the other end
2238  * @param tunnel_ctx the socket
2239  * @param sender who sent the message
2240  * @param message the actual message
2241  * @param atsi performance data for the connection
2242  * @return GNUNET_OK to keep the connection open,
2243  *         GNUNET_SYSERR to close it (signal serious error)
2244  */
2245 static int
2246 server_handle_hello (void *cls,
2247                      struct GNUNET_MESH_Tunnel *tunnel,
2248                      void **tunnel_ctx,
2249                      const struct GNUNET_PeerIdentity *sender,
2250                      const struct GNUNET_MessageHeader *message,
2251                      const struct GNUNET_ATS_Information*atsi)
2252 {
2253   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2254   const struct GNUNET_STREAM_HelloMessage *hello;
2255   struct GNUNET_STREAM_HelloAckMessage *reply;
2256   uint32_t port;
2257
2258   hello = (const struct GNUNET_STREAM_HelloMessage *) message;
2259   if (0 != memcmp (sender,
2260                    &socket->other_peer,
2261                    sizeof (struct GNUNET_PeerIdentity)))
2262   {
2263     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2264                GNUNET_i2s (&socket->other_peer));
2265     return GNUNET_YES;
2266   }
2267   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2268   GNUNET_assert (socket->tunnel == tunnel);
2269   LOG_DEBUG ("%1$s: Received HELLO from %1$s\n", 
2270              GNUNET_i2s (&socket->other_peer));
2271   port = (uint32_t) GNUNET_ntohll (hello->port);
2272   switch (socket->state)
2273   {
2274   case STATE_INIT:
2275     if (port != socket->port)
2276     {
2277       LOG_DEBUG ("Ignoring HELLO for port %u\n", port);
2278       GNUNET_MESH_tunnel_destroy (tunnel);
2279       GNUNET_free (socket);
2280       return GNUNET_OK;
2281     }
2282     reply = generate_hello_ack (socket, GNUNET_YES);
2283     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2284                    GNUNET_NO);
2285     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2286                    socket->control_retransmission_task_id);
2287     socket->control_retransmission_task_id =
2288       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2289                                     &control_retransmission_task, socket);
2290     break;
2291   case STATE_HELLO_WAIT:
2292     /* Perhaps our HELLO_ACK was lost */
2293     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2294                    socket->control_retransmission_task_id);
2295     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2296     socket->control_retransmission_task_id =
2297       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2298     break;
2299   default:
2300     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2301                GNUNET_i2s (&socket->other_peer), socket->state);
2302     /* FIXME: Send RESET? */
2303   }
2304   return GNUNET_OK;
2305 }
2306
2307
2308 /**
2309  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2310  *
2311  * @param cls the closure
2312  * @param tunnel connection to the other end
2313  * @param tunnel_ctx the socket
2314  * @param sender who sent the message
2315  * @param message the actual message
2316  * @param atsi performance data for the connection
2317  * @return GNUNET_OK to keep the connection open,
2318  *         GNUNET_SYSERR to close it (signal serious error)
2319  */
2320 static int
2321 server_handle_hello_ack (void *cls,
2322                          struct GNUNET_MESH_Tunnel *tunnel,
2323                          void **tunnel_ctx,
2324                          const struct GNUNET_PeerIdentity *sender,
2325                          const struct GNUNET_MessageHeader *message,
2326                          const struct GNUNET_ATS_Information*atsi)
2327 {
2328   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2329   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2330
2331   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2332                  ntohs (message->type));
2333   GNUNET_assert (socket->tunnel == tunnel);
2334   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2335   switch (socket->state)  
2336   {
2337   case STATE_HELLO_WAIT:
2338     LOG (GNUNET_ERROR_TYPE_DEBUG,
2339          "%s: Received HELLO_ACK from %s\n",
2340          GNUNET_i2s (&socket->other_peer),
2341          GNUNET_i2s (&socket->other_peer));
2342     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2343     LOG (GNUNET_ERROR_TYPE_DEBUG,
2344          "%s: Read sequence number %u\n",
2345          GNUNET_i2s (&socket->other_peer),
2346          (unsigned int) socket->read_sequence_number);
2347     socket->receiver_window_available = 
2348       ntohl (ack_message->receiver_window_size);
2349     set_state_established (NULL, socket);
2350     break;
2351   default:
2352     LOG (GNUNET_ERROR_TYPE_DEBUG,
2353          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2354   }
2355   return GNUNET_OK;
2356 }
2357
2358
2359 /**
2360  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2361  *
2362  * @param cls the closure
2363  * @param tunnel connection to the other end
2364  * @param tunnel_ctx the socket
2365  * @param sender who sent the message
2366  * @param message the actual message
2367  * @param atsi performance data for the connection
2368  * @return GNUNET_OK to keep the connection open,
2369  *         GNUNET_SYSERR to close it (signal serious error)
2370  */
2371 static int
2372 server_handle_reset (void *cls,
2373                      struct GNUNET_MESH_Tunnel *tunnel,
2374                      void **tunnel_ctx,
2375                      const struct GNUNET_PeerIdentity *sender,
2376                      const struct GNUNET_MessageHeader *message,
2377                      const struct GNUNET_ATS_Information*atsi)
2378 {
2379   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2380   /* FIXME */
2381   return GNUNET_OK;
2382 }
2383
2384
2385 /**
2386  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2387  *
2388  * @param cls the closure
2389  * @param tunnel connection to the other end
2390  * @param tunnel_ctx the socket
2391  * @param sender who sent the message
2392  * @param message the actual message
2393  * @param atsi performance data for the connection
2394  * @return GNUNET_OK to keep the connection open,
2395  *         GNUNET_SYSERR to close it (signal serious error)
2396  */
2397 static int
2398 server_handle_transmit_close (void *cls,
2399                               struct GNUNET_MESH_Tunnel *tunnel,
2400                               void **tunnel_ctx,
2401                               const struct GNUNET_PeerIdentity *sender,
2402                               const struct GNUNET_MessageHeader *message,
2403                               const struct GNUNET_ATS_Information*atsi)
2404 {
2405   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2406
2407   return handle_transmit_close (socket, tunnel, sender, message, atsi);
2408 }
2409
2410
2411 /**
2412  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2413  *
2414  * @param cls the closure
2415  * @param tunnel connection to the other end
2416  * @param tunnel_ctx the socket
2417  * @param sender who sent the message
2418  * @param message the actual message
2419  * @param atsi performance data for the connection
2420  * @return GNUNET_OK to keep the connection open,
2421  *         GNUNET_SYSERR to close it (signal serious error)
2422  */
2423 static int
2424 server_handle_transmit_close_ack (void *cls,
2425                                   struct GNUNET_MESH_Tunnel *tunnel,
2426                                   void **tunnel_ctx,
2427                                   const struct GNUNET_PeerIdentity *sender,
2428                                   const struct GNUNET_MessageHeader *message,
2429                                   const struct GNUNET_ATS_Information*atsi)
2430 {
2431   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2432
2433   return handle_generic_close_ack (socket, tunnel, sender, message, atsi,
2434                                    SHUT_WR);
2435 }
2436
2437
2438 /**
2439  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2440  *
2441  * @param cls the closure
2442  * @param tunnel connection to the other end
2443  * @param tunnel_ctx the socket
2444  * @param sender who sent the message
2445  * @param message the actual message
2446  * @param atsi performance data for the connection
2447  * @return GNUNET_OK to keep the connection open,
2448  *         GNUNET_SYSERR to close it (signal serious error)
2449  */
2450 static int
2451 server_handle_receive_close (void *cls,
2452                              struct GNUNET_MESH_Tunnel *tunnel,
2453                              void **tunnel_ctx,
2454                              const struct GNUNET_PeerIdentity *sender,
2455                              const struct GNUNET_MessageHeader *message,
2456                              const struct GNUNET_ATS_Information*atsi)
2457 {
2458   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2459
2460   return handle_receive_close (socket, tunnel, sender, message, atsi);
2461 }
2462
2463
2464 /**
2465  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2466  *
2467  * @param cls the closure
2468  * @param tunnel connection to the other end
2469  * @param tunnel_ctx the socket
2470  * @param sender who sent the message
2471  * @param message the actual message
2472  * @param atsi performance data for the connection
2473  * @return GNUNET_OK to keep the connection open,
2474  *         GNUNET_SYSERR to close it (signal serious error)
2475  */
2476 static int
2477 server_handle_receive_close_ack (void *cls,
2478                                  struct GNUNET_MESH_Tunnel *tunnel,
2479                                  void **tunnel_ctx,
2480                                  const struct GNUNET_PeerIdentity *sender,
2481                                  const struct GNUNET_MessageHeader *message,
2482                                  const struct GNUNET_ATS_Information*atsi)
2483 {
2484   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2485
2486   return handle_generic_close_ack (socket, tunnel, sender, message, atsi,
2487                                    SHUT_RD);
2488 }
2489
2490
2491 /**
2492  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2493  *
2494  * @param cls the listen socket (from GNUNET_MESH_connect in
2495  *          GNUNET_STREAM_listen) 
2496  * @param tunnel connection to the other end
2497  * @param tunnel_ctx the socket
2498  * @param sender who sent the message
2499  * @param message the actual message
2500  * @param atsi performance data for the connection
2501  * @return GNUNET_OK to keep the connection open,
2502  *         GNUNET_SYSERR to close it (signal serious error)
2503  */
2504 static int
2505 server_handle_close (void *cls,
2506                      struct GNUNET_MESH_Tunnel *tunnel,
2507                      void **tunnel_ctx,
2508                      const struct GNUNET_PeerIdentity *sender,
2509                      const struct GNUNET_MessageHeader *message,
2510                      const struct GNUNET_ATS_Information*atsi)
2511 {
2512   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2513   
2514   return handle_close (socket, tunnel, sender, message, atsi);
2515 }
2516
2517
2518 /**
2519  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2520  *
2521  * @param cls the closure
2522  * @param tunnel connection to the other end
2523  * @param tunnel_ctx the socket
2524  * @param sender who sent the message
2525  * @param message the actual message
2526  * @param atsi performance data for the connection
2527  * @return GNUNET_OK to keep the connection open,
2528  *         GNUNET_SYSERR to close it (signal serious error)
2529  */
2530 static int
2531 server_handle_close_ack (void *cls,
2532                          struct GNUNET_MESH_Tunnel *tunnel,
2533                          void **tunnel_ctx,
2534                          const struct GNUNET_PeerIdentity *sender,
2535                          const struct GNUNET_MessageHeader *message,
2536                          const struct GNUNET_ATS_Information*atsi)
2537 {
2538   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2539
2540   return handle_generic_close_ack (socket, tunnel, sender, message, atsi,
2541                                    SHUT_RDWR);
2542 }
2543
2544
2545 /**
2546  * Handler for DATA_ACK messages
2547  *
2548  * @param socket the socket through which the ack was received
2549  * @param tunnel connection to the other end
2550  * @param sender who sent the message
2551  * @param ack the acknowledgment message
2552  * @param atsi performance data for the connection
2553  * @return GNUNET_OK to keep the connection open,
2554  *         GNUNET_SYSERR to close it (signal serious error)
2555  */
2556 static int
2557 handle_ack (struct GNUNET_STREAM_Socket *socket,
2558             struct GNUNET_MESH_Tunnel *tunnel,
2559             const struct GNUNET_PeerIdentity *sender,
2560             const struct GNUNET_STREAM_AckMessage *ack,
2561             const struct GNUNET_ATS_Information*atsi)
2562 {
2563   struct GNUNET_STREAM_WriteHandle *write_handle;
2564   uint64_t ack_bitmap;
2565   unsigned int packet;
2566   int need_retransmission;
2567   uint32_t sequence_difference;
2568
2569   if (0 != memcmp (sender,
2570                    &socket->other_peer,
2571                    sizeof (struct GNUNET_PeerIdentity)))
2572   {
2573     LOG (GNUNET_ERROR_TYPE_DEBUG,
2574          "%s: Received ACK from non-confirming peer\n",
2575          GNUNET_i2s (&socket->other_peer));
2576     return GNUNET_YES;
2577   }
2578   switch (socket->state)
2579   {
2580   case (STATE_ESTABLISHED):
2581   case (STATE_RECEIVE_CLOSED):
2582   case (STATE_RECEIVE_CLOSE_WAIT):
2583     if (NULL == socket->write_handle)
2584     {
2585       LOG (GNUNET_ERROR_TYPE_DEBUG,
2586            "%s: Received DATA_ACK when write_handle is NULL\n",
2587            GNUNET_i2s (&socket->other_peer));
2588       return GNUNET_OK;
2589     }
2590     sequence_difference = 
2591         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2592     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2593     {
2594       LOG (GNUNET_ERROR_TYPE_DEBUG,
2595            "%s: Received DATA_ACK with unexpected base sequence number\n",
2596            GNUNET_i2s (&socket->other_peer));
2597       LOG (GNUNET_ERROR_TYPE_DEBUG,
2598            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2599            GNUNET_i2s (&socket->other_peer),
2600            socket->write_sequence_number,
2601            ntohl (ack->base_sequence_number));
2602       return GNUNET_OK;
2603     }
2604     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2605          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2606     /* Cancel the retransmission task */
2607     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2608     {
2609       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2610       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2611       socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
2612     }
2613     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2614     {
2615       if (NULL == socket->write_handle->messages[packet])
2616         break;
2617       /* BS: Base sequence from ack; PS: sequence num of current packet */
2618       sequence_difference = ntohl (ack->base_sequence_number)
2619         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2620       if (0 == sequence_difference)
2621         break; /* The message in our handle is not yet received */
2622       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2623       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2624       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2625                             packet, GNUNET_YES);
2626     }
2627     if (((ntohl (ack->base_sequence_number)
2628          - (socket->write_handle->max_ack_base_num))
2629           <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2630     {
2631       socket->write_handle->max_ack_base_num = ntohl (ack->base_sequence_number);
2632       socket->receiver_window_available = 
2633           ntohl (ack->receive_window_remaining);
2634     }
2635     else
2636       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2637                   "Ignoring to modify receive window available as base: %u, max_ack_base: %u\n",
2638                   ntohl (ack->base_sequence_number),
2639                    socket->write_handle->max_ack_base_num);
2640     if ((packet == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2641         || ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2642             && (NULL == socket->write_handle->messages[packet])))
2643       goto call_write_cont_cb;
2644     GNUNET_assert (ntohl
2645                    (socket->write_handle->messages[packet]->sequence_number)
2646                    == ntohl (ack->base_sequence_number));
2647     /* Update our bitmap */
2648     ack_bitmap = GNUNET_ntohll (ack->bitmap);
2649     for (; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2650     {
2651       if (NULL == socket->write_handle->messages[packet]) break;
2652       if (ackbitmap_is_bit_set (&ack_bitmap, ntohl
2653                                 (socket->write_handle->messages[packet]->sequence_number)
2654                                 - ntohl (ack->base_sequence_number)))
2655         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet, GNUNET_YES);
2656     }
2657     /* Check if we have received all acknowledgements */
2658     need_retransmission = GNUNET_NO;
2659     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2660     {
2661       if (NULL == socket->write_handle->messages[packet]) break;
2662       if (GNUNET_YES != ackbitmap_is_bit_set 
2663           (&socket->write_handle->ack_bitmap,packet))
2664       {
2665         need_retransmission = GNUNET_YES;
2666         break;
2667       }
2668     }
2669     if (GNUNET_YES == need_retransmission)
2670     {
2671       write_data (socket);
2672       return GNUNET_OK;
2673     }
2674
2675   call_write_cont_cb:
2676     /* Free the packets */
2677     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2678     {
2679       GNUNET_free_non_null (socket->write_handle->messages[packet]);
2680     }
2681     write_handle = socket->write_handle;
2682     socket->write_handle = NULL;
2683     if (NULL != write_handle->write_cont)
2684       write_handle->write_cont (write_handle->write_cont_cls,
2685                                 GNUNET_STREAM_OK,
2686                                 write_handle->size);
2687     /* We are done with the write handle - Freeing it */
2688     GNUNET_free (write_handle);
2689     LOG (GNUNET_ERROR_TYPE_DEBUG,
2690          "%s: Write completion callback completed\n",
2691          GNUNET_i2s (&socket->other_peer));      
2692     break;
2693   default:
2694     break;
2695   }
2696   return GNUNET_OK;
2697 }
2698
2699
2700 /**
2701  * Handler for DATA_ACK messages
2702  *
2703  * @param cls the 'struct GNUNET_STREAM_Socket'
2704  * @param tunnel connection to the other end
2705  * @param tunnel_ctx unused
2706  * @param sender who sent the message
2707  * @param message the actual message
2708  * @param atsi performance data for the connection
2709  * @return GNUNET_OK to keep the connection open,
2710  *         GNUNET_SYSERR to close it (signal serious error)
2711  */
2712 static int
2713 client_handle_ack (void *cls,
2714                    struct GNUNET_MESH_Tunnel *tunnel,
2715                    void **tunnel_ctx,
2716                    const struct GNUNET_PeerIdentity *sender,
2717                    const struct GNUNET_MessageHeader *message,
2718                    const struct GNUNET_ATS_Information*atsi)
2719 {
2720   struct GNUNET_STREAM_Socket *socket = cls;
2721   const struct GNUNET_STREAM_AckMessage *ack;
2722
2723   ack = (const struct GNUNET_STREAM_AckMessage *) message; 
2724   return handle_ack (socket, tunnel, sender, ack, atsi);
2725 }
2726
2727
2728 /**
2729  * Handler for DATA_ACK messages
2730  *
2731  * @param cls the server's listen socket
2732  * @param tunnel connection to the other end
2733  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2734  * @param sender who sent the message
2735  * @param message the actual message
2736  * @param atsi performance data for the connection
2737  * @return GNUNET_OK to keep the connection open,
2738  *         GNUNET_SYSERR to close it (signal serious error)
2739  */
2740 static int
2741 server_handle_ack (void *cls,
2742                    struct GNUNET_MESH_Tunnel *tunnel,
2743                    void **tunnel_ctx,
2744                    const struct GNUNET_PeerIdentity *sender,
2745                    const struct GNUNET_MessageHeader *message,
2746                    const struct GNUNET_ATS_Information*atsi)
2747 {
2748   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2749   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2750  
2751   return handle_ack (socket, tunnel, sender, ack, atsi);
2752 }
2753
2754
2755 /**
2756  * For client message handlers, the stream socket is in the
2757  * closure argument.
2758  */
2759 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2760   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2761   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2762    sizeof (struct GNUNET_STREAM_AckMessage) },
2763   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2764    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2765   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2766    sizeof (struct GNUNET_MessageHeader)},
2767   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2768    sizeof (struct GNUNET_MessageHeader)},
2769   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2770    sizeof (struct GNUNET_MessageHeader)},
2771   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2772    sizeof (struct GNUNET_MessageHeader)},
2773   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2774    sizeof (struct GNUNET_MessageHeader)},
2775   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2776    sizeof (struct GNUNET_MessageHeader)},
2777   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2778    sizeof (struct GNUNET_MessageHeader)},
2779   {NULL, 0, 0}
2780 };
2781
2782
2783 /**
2784  * For server message handlers, the stream socket is in the
2785  * tunnel context, and the listen socket in the closure argument.
2786  */
2787 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2788   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2789   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2790    sizeof (struct GNUNET_STREAM_AckMessage) },
2791   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2792    sizeof (struct GNUNET_STREAM_HelloMessage)},
2793   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2794    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2795   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2796    sizeof (struct GNUNET_MessageHeader)},
2797   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2798    sizeof (struct GNUNET_MessageHeader)},
2799   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2800    sizeof (struct GNUNET_MessageHeader)},
2801   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2802    sizeof (struct GNUNET_MessageHeader)},
2803   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2804    sizeof (struct GNUNET_MessageHeader)},
2805   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2806    sizeof (struct GNUNET_MessageHeader)},
2807   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2808    sizeof (struct GNUNET_MessageHeader)},
2809   {NULL, 0, 0}
2810 };
2811
2812
2813 /**
2814  * Function called when our target peer is connected to our tunnel
2815  *
2816  * @param cls the socket for which this tunnel is created
2817  * @param peer the peer identity of the target
2818  * @param atsi performance data for the connection
2819  */
2820 static void
2821 mesh_peer_connect_callback (void *cls,
2822                             const struct GNUNET_PeerIdentity *peer,
2823                             const struct GNUNET_ATS_Information * atsi)
2824 {
2825   struct GNUNET_STREAM_Socket *socket = cls;
2826   struct GNUNET_MessageHeader *message;
2827   
2828   if (0 != memcmp (peer,
2829                    &socket->other_peer,
2830                    sizeof (struct GNUNET_PeerIdentity)))
2831   {
2832     LOG (GNUNET_ERROR_TYPE_DEBUG,
2833          "%s: A peer which is not our target has connected to our tunnel\n",
2834          GNUNET_i2s(peer));
2835     return;
2836   }
2837   LOG (GNUNET_ERROR_TYPE_DEBUG,
2838        "%s: Target peer %s connected\n",
2839        GNUNET_i2s (&socket->other_peer),
2840        GNUNET_i2s (&socket->other_peer));
2841   /* Set state to INIT */
2842   socket->state = STATE_INIT;
2843   /* Send HELLO message */
2844   message = generate_hello (socket);
2845   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2846   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2847     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2848   socket->control_retransmission_task_id =
2849     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2850                                   &control_retransmission_task, socket);
2851 }
2852
2853
2854 /**
2855  * Function called when our target peer is disconnected from our tunnel
2856  *
2857  * @param cls the socket associated which this tunnel
2858  * @param peer the peer identity of the target
2859  */
2860 static void
2861 mesh_peer_disconnect_callback (void *cls,
2862                                const struct GNUNET_PeerIdentity *peer)
2863 {
2864   struct GNUNET_STREAM_Socket *socket=cls;
2865   
2866   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2867   LOG_DEBUG ("%1$s: Other peer %1$s disconnected \n",
2868              GNUNET_i2s (&socket->other_peer));
2869 }
2870
2871
2872 /**
2873  * Method called whenever a peer creates a tunnel to us
2874  *
2875  * @param cls closure
2876  * @param tunnel new handle to the tunnel
2877  * @param initiator peer that started the tunnel
2878  * @param atsi performance information for the tunnel
2879  * @return initial tunnel context for the tunnel
2880  *         (can be NULL -- that's not an error)
2881  */
2882 static void *
2883 new_tunnel_notify (void *cls,
2884                    struct GNUNET_MESH_Tunnel *tunnel,
2885                    const struct GNUNET_PeerIdentity *initiator,
2886                    const struct GNUNET_ATS_Information *atsi)
2887 {
2888   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2889   struct GNUNET_STREAM_Socket *socket;
2890
2891   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2892      from the same peer again until the socket is closed */
2893   if (GNUNET_NO == lsocket->listening)
2894   {
2895     GNUNET_MESH_tunnel_destroy (tunnel);
2896     return NULL;
2897   }
2898   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2899   socket->other_peer = *initiator;
2900   socket->tunnel = tunnel;
2901   socket->state = STATE_INIT;
2902   socket->lsocket = lsocket;
2903   socket->port = lsocket->port;
2904   socket->stat_handle = lsocket->stat_handle;
2905   socket->retransmit_timeout = lsocket->retransmit_timeout;
2906   socket->testing_active = lsocket->testing_active;
2907   socket->testing_set_write_sequence_number_value =
2908       lsocket->testing_set_write_sequence_number_value;
2909   socket->max_payload_size = lsocket->max_payload_size;
2910   LOG_DEBUG ("%1$s: Peer %1$s initiated tunnel to us\n",
2911              GNUNET_i2s (&socket->other_peer));
2912   if (NULL != socket->stat_handle)
2913   {
2914     GNUNET_STATISTICS_update (socket->stat_handle,
2915                               "total inbound connections received",
2916                               1, GNUNET_NO);
2917     GNUNET_STATISTICS_update (socket->stat_handle,
2918                               "inbound connections", 1, GNUNET_NO);
2919   }
2920   return socket;
2921 }
2922
2923
2924 /**
2925  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2926  * any associated state.  This function is NOT called if the client has
2927  * explicitly asked for the tunnel to be destroyed using
2928  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2929  * the tunnel.
2930  *
2931  * @param cls closure (set from GNUNET_MESH_connect)
2932  * @param tunnel connection to the other end (henceforth invalid)
2933  * @param tunnel_ctx place where local state associated
2934  *                   with the tunnel is stored
2935  */
2936 static void 
2937 tunnel_cleaner (void *cls,
2938                 const struct GNUNET_MESH_Tunnel *tunnel,
2939                 void *tunnel_ctx)
2940 {
2941   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2942   struct MessageQueue *head;
2943
2944   GNUNET_assert (tunnel == socket->tunnel);
2945   GNUNET_break_op(0);
2946   LOG (GNUNET_ERROR_TYPE_DEBUG,
2947        "%s: Peer %s has terminated connection abruptly\n",
2948        GNUNET_i2s (&socket->other_peer),
2949        GNUNET_i2s (&socket->other_peer));
2950   if (NULL != socket->stat_handle)
2951   {
2952     GNUNET_STATISTICS_update (socket->stat_handle,
2953                               "connections terminated abruptly", 1, GNUNET_NO);
2954     GNUNET_STATISTICS_update (socket->stat_handle,
2955                               "inbound connections", -1, GNUNET_NO);
2956   }
2957   /* Clear Transmit handles */
2958   if (NULL != socket->transmit_handle)
2959   {
2960     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2961     socket->transmit_handle = NULL;
2962   }
2963   /* Stop Tasks using socket->tunnel */
2964   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2965   {
2966     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2967     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2968   }
2969   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2970   {
2971     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2972     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2973   }  
2974   /* Terminate the control retransmission tasks */
2975   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2976   {
2977     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2978     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2979   }
2980   /* Clear existing message queue */
2981   while (NULL != (head = socket->queue_head)) {
2982     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2983                                  socket->queue_tail,
2984                                  head);
2985     GNUNET_free (head->message);
2986     GNUNET_free (head);
2987   }
2988   socket->tunnel = NULL;
2989 }
2990
2991
2992 /**
2993  * Callback to signal timeout on lockmanager lock acquire
2994  *
2995  * @param cls the ListenSocket
2996  * @param tc the scheduler task context
2997  */
2998 static void
2999 lockmanager_acquire_timeout (void *cls, 
3000                              const struct GNUNET_SCHEDULER_TaskContext *tc)
3001 {
3002   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
3003   GNUNET_STREAM_ListenCallback listen_cb;
3004   void *listen_cb_cls;
3005
3006   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
3007   listen_cb = lsocket->listen_cb;
3008   listen_cb_cls = lsocket->listen_cb_cls;
3009   if (NULL != listen_cb)
3010     listen_cb (listen_cb_cls, NULL, NULL);
3011 }
3012
3013
3014 /**
3015  * Callback to notify us on the status changes on app_port lock
3016  *
3017  * @param cls the ListenSocket
3018  * @param domain the domain name of the lock
3019  * @param lock the app_port
3020  * @param status the current status of the lock
3021  */
3022 static void
3023 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
3024                        enum GNUNET_LOCKMANAGER_Status status)
3025 {
3026   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
3027
3028   GNUNET_assert (lock == (uint32_t) lsocket->port);
3029   if (GNUNET_LOCKMANAGER_SUCCESS == status)
3030   {
3031     lsocket->listening = GNUNET_YES;
3032     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3033     {
3034       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3035       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
3036     }
3037     if (NULL == lsocket->mesh)
3038     {
3039       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
3040
3041       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
3042                                            lsocket, /* Closure */
3043                                            &new_tunnel_notify,
3044                                            &tunnel_cleaner,
3045                                            server_message_handlers,
3046                                            ports);
3047       GNUNET_assert (NULL != lsocket->mesh);
3048       if (NULL != lsocket->listen_ok_cb)
3049       {
3050         (void) lsocket->listen_ok_cb ();
3051       }
3052     }
3053   }
3054   if (GNUNET_LOCKMANAGER_RELEASE == status)
3055     lsocket->listening = GNUNET_NO;
3056 }
3057
3058
3059 /*****************/
3060 /* API functions */
3061 /*****************/
3062
3063
3064 /**
3065  * Tries to open a stream to the target peer
3066  *
3067  * @param cfg configuration to use
3068  * @param target the target peer to which the stream has to be opened
3069  * @param app_port the application port number which uniquely identifies this
3070  *            stream
3071  * @param open_cb this function will be called after stream has be established;
3072  *          cannot be NULL
3073  * @param open_cb_cls the closure for open_cb
3074  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3075  * @return if successful it returns the stream socket; NULL if stream cannot be
3076  *         opened 
3077  */
3078 struct GNUNET_STREAM_Socket *
3079 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3080                     const struct GNUNET_PeerIdentity *target,
3081                     GNUNET_MESH_ApplicationType app_port,
3082                     GNUNET_STREAM_OpenCallback open_cb,
3083                     void *open_cb_cls,
3084                     ...)
3085 {
3086   struct GNUNET_STREAM_Socket *socket;
3087   enum GNUNET_STREAM_Option option;
3088   va_list vargs;
3089   uint16_t payload_size;
3090
3091   LOG (GNUNET_ERROR_TYPE_DEBUG,
3092        "%s\n", __func__);
3093   GNUNET_assert (NULL != open_cb);
3094   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3095   socket->other_peer = *target;
3096   socket->open_cb = open_cb;
3097   socket->open_cls = open_cb_cls;
3098   socket->port = app_port;
3099   /* Set defaults */
3100   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3101   socket->testing_active = GNUNET_NO;
3102   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3103   va_start (vargs, open_cb_cls); /* Parse variable args */
3104   do {
3105     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3106     switch (option)
3107     {
3108     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3109       /* Expect struct GNUNET_TIME_Relative */
3110       socket->retransmit_timeout = va_arg (vargs,
3111                                            struct GNUNET_TIME_Relative);
3112       break;
3113     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3114       socket->testing_active = GNUNET_YES;
3115       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3116                                                                 uint32_t);
3117       break;
3118     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3119       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3120       break;
3121     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3122       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3123       break;
3124     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3125       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3126       GNUNET_assert (0 != payload_size);
3127       if (payload_size < socket->max_payload_size)
3128         socket->max_payload_size = payload_size;
3129       break;
3130     case GNUNET_STREAM_OPTION_END:
3131       break;
3132     }
3133   } while (GNUNET_STREAM_OPTION_END != option);
3134   va_end (vargs);               /* End of variable args parsing */
3135   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3136                                       socket, /* cls */
3137                                       NULL, /* No inbound tunnel handler */
3138                                       NULL, /* No in-tunnel cleaner */
3139                                       client_message_handlers,
3140                                       NULL); /* We don't get inbound tunnels */
3141   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3142   {
3143     GNUNET_free (socket);
3144     return NULL;
3145   }
3146   /* Now create the mesh tunnel to target */
3147   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3148   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3149                                               NULL, /* Tunnel context */
3150                                               &mesh_peer_connect_callback,
3151                                               &mesh_peer_disconnect_callback,
3152                                               socket);
3153   GNUNET_assert (NULL != socket->tunnel);
3154   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3155                                         &socket->other_peer);
3156   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3157   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3158   return socket;
3159 }
3160
3161
3162 /**
3163  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3164  *
3165  * @param socket the stream socket
3166  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3167  * @param completion_cb the callback that will be called upon successful
3168  *          shutdown of given operation
3169  * @param completion_cls the closure for the completion callback
3170  * @return the shutdown handle
3171  */
3172 struct GNUNET_STREAM_ShutdownHandle *
3173 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3174                         int operation,
3175                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3176                         void *completion_cls)
3177 {
3178   struct GNUNET_STREAM_ShutdownHandle *handle;
3179   struct GNUNET_MessageHeader *msg;
3180   
3181   GNUNET_assert (NULL == socket->shutdown_handle);
3182   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3183   handle->socket = socket;
3184   handle->completion_cb = completion_cb;
3185   handle->completion_cls = completion_cls;
3186   socket->shutdown_handle = handle;
3187   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3188        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3189        || ((GNUNET_YES == socket->transmit_closed) 
3190            && (GNUNET_YES == socket->receive_closed)
3191            && (SHUT_RDWR == operation)) )
3192   {
3193     handle->operation = operation;
3194     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3195                                                           socket);
3196     return handle;
3197   }
3198   msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
3199   msg->size = htons (sizeof (struct GNUNET_MessageHeader));
3200   switch (operation)
3201   {
3202   case SHUT_RD:
3203     handle->operation = SHUT_RD;
3204     if (NULL != socket->read_handle)
3205       LOG (GNUNET_ERROR_TYPE_WARNING,
3206            "Existing read handle should be cancelled before shutting"
3207            " down reading\n");
3208     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3209     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3210                    GNUNET_NO);
3211     socket->receive_closed = GNUNET_YES;
3212     break;
3213   case SHUT_WR:
3214     handle->operation = SHUT_WR;
3215     if (NULL != socket->write_handle)
3216       LOG (GNUNET_ERROR_TYPE_WARNING,
3217            "Existing write handle should be cancelled before shutting"
3218            " down writing\n");
3219     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3220     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3221                    GNUNET_NO);
3222     socket->transmit_closed = GNUNET_YES;
3223     break;
3224   case SHUT_RDWR:
3225     handle->operation = SHUT_RDWR;
3226     if (NULL != socket->write_handle)
3227       LOG (GNUNET_ERROR_TYPE_WARNING,
3228            "Existing write handle should be cancelled before shutting"
3229            " down writing\n");
3230     if (NULL != socket->read_handle)
3231       LOG (GNUNET_ERROR_TYPE_WARNING,
3232            "Existing read handle should be cancelled before shutting"
3233            " down reading\n");
3234     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3235     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3236     socket->transmit_closed = GNUNET_YES;
3237     socket->receive_closed = GNUNET_YES;
3238     break;
3239   default:
3240     LOG (GNUNET_ERROR_TYPE_WARNING,
3241          "GNUNET_STREAM_shutdown called with invalid value for "
3242          "parameter operation -- Ignoring\n");
3243     GNUNET_free (msg);
3244     GNUNET_free (handle);
3245     return NULL;
3246   }
3247   handle->close_msg_retransmission_task_id =
3248     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3249                                   &close_msg_retransmission_task,
3250                                   handle);
3251   return handle;
3252 }
3253
3254
3255 /**
3256  * Cancels a pending shutdown. Note that the shutdown messages may already be
3257  * sent and the stream is shutdown already for the operation given to
3258  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3259  * shutdown messages and frees the shutdown handle.
3260  *
3261  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3262  */
3263 void
3264 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3265 {
3266   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3267     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3268   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3269     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3270   handle->socket->shutdown_handle = NULL;
3271   GNUNET_free (handle);
3272 }
3273
3274
3275 /**
3276  * Closes the stream
3277  *
3278  * @param socket the stream socket
3279  */
3280 void
3281 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3282 {
3283   struct MessageQueue *head;
3284
3285   if (NULL != socket->read_handle)
3286   {
3287     LOG (GNUNET_ERROR_TYPE_WARNING,
3288          "Closing STREAM socket when a read handle is pending\n");
3289     GNUNET_STREAM_read_cancel (socket->read_handle);
3290   }
3291   if (NULL != socket->write_handle)
3292   {
3293     LOG (GNUNET_ERROR_TYPE_WARNING,
3294          "Closing STREAM socket when a write handle is pending\n");
3295     GNUNET_STREAM_write_cancel (socket->write_handle);
3296     //socket->write_handle = NULL;
3297   }
3298   /* Terminate the ack'ing task if they are still present */
3299   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3300   {
3301     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3302     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3303   }
3304   /* Terminate the control retransmission tasks */
3305   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3306   {
3307     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3308   }
3309   /* Clear Transmit handles */
3310   if (NULL != socket->transmit_handle)
3311   {
3312     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3313     socket->transmit_handle = NULL;
3314   }
3315   /* Clear existing message queue */
3316   while (NULL != (head = socket->queue_head)) {
3317     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3318                                  socket->queue_tail,
3319                                  head);
3320     GNUNET_free (head->message);
3321     GNUNET_free (head);
3322   }
3323   /* Close associated tunnel */
3324   if (NULL != socket->tunnel)
3325   {
3326     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3327     socket->tunnel = NULL;
3328   }
3329   /* Close mesh connection */
3330   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3331   {
3332     GNUNET_MESH_disconnect (socket->mesh);
3333     socket->mesh = NULL;
3334   }
3335   /* Close statistics connection */
3336   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3337     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3338   /* Release receive buffer */
3339   if (NULL != socket->receive_buffer)
3340   {
3341     GNUNET_free (socket->receive_buffer);
3342   }
3343   GNUNET_free (socket);
3344 }
3345
3346
3347 /**
3348  * Listens for stream connections for a specific application ports
3349  *
3350  * @param cfg the configuration to use
3351  * @param app_port the application port for which new streams will be accepted
3352  * @param listen_cb this function will be called when a peer tries to establish
3353  *            a stream with us
3354  * @param listen_cb_cls closure for listen_cb
3355  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3356  * @return listen socket, NULL for any error
3357  */
3358 struct GNUNET_STREAM_ListenSocket *
3359 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3360                       GNUNET_MESH_ApplicationType app_port,
3361                       GNUNET_STREAM_ListenCallback listen_cb,
3362                       void *listen_cb_cls,
3363                       ...)
3364 {
3365   struct GNUNET_STREAM_ListenSocket *lsocket;
3366   struct GNUNET_TIME_Relative listen_timeout;
3367   enum GNUNET_STREAM_Option option;
3368   va_list vargs;
3369   uint16_t payload_size;
3370
3371   GNUNET_assert (NULL != listen_cb);
3372   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3373   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3374   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3375   if (NULL == lsocket->lockmanager)
3376   {
3377     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3378     GNUNET_free (lsocket);
3379     return NULL;
3380   }
3381   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3382   /* Set defaults */
3383   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3384   lsocket->testing_active = GNUNET_NO;
3385   lsocket->listen_ok_cb = NULL;
3386   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3387   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3388   va_start (vargs, listen_cb_cls);
3389   do {
3390     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3391     switch (option)
3392     {
3393     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3394       lsocket->retransmit_timeout = va_arg (vargs,
3395                                             struct GNUNET_TIME_Relative);
3396       break;
3397     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3398       lsocket->testing_active = GNUNET_YES;
3399       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3400                                                                  uint32_t);
3401       break;
3402     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3403       listen_timeout = GNUNET_TIME_relative_multiply
3404         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3405       break;
3406     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3407       lsocket->listen_ok_cb = va_arg (vargs,
3408                                       GNUNET_STREAM_ListenSuccessCallback);
3409       break;
3410     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3411       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3412       GNUNET_assert (0 != payload_size);
3413       if (payload_size < lsocket->max_payload_size)
3414         lsocket->max_payload_size = payload_size;
3415       break;
3416     case GNUNET_STREAM_OPTION_END:
3417       break;
3418     }
3419   } while (GNUNET_STREAM_OPTION_END != option);
3420   va_end (vargs);
3421   lsocket->port = app_port;
3422   lsocket->listen_cb = listen_cb;
3423   lsocket->listen_cb_cls = listen_cb_cls;
3424   lsocket->locking_request = 
3425     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3426                                      (uint32_t) lsocket->port,
3427                                      &lock_status_change_cb, lsocket);
3428   lsocket->lockmanager_acquire_timeout_task =
3429     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3430                                   &lockmanager_acquire_timeout, lsocket);
3431   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3432                                                    lsocket->cfg);
3433   return lsocket;
3434 }
3435
3436
3437 /**
3438  * Closes the listen socket
3439  *
3440  * @param lsocket the listen socket
3441  */
3442 void
3443 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3444 {
3445   /* Close MESH connection */
3446   if (NULL != lsocket->mesh)
3447     GNUNET_MESH_disconnect (lsocket->mesh);
3448   if (NULL != lsocket->stat_handle)
3449     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3450   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3451   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3452     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3453   if (NULL != lsocket->locking_request)
3454     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3455   if (NULL != lsocket->lockmanager)
3456     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3457   GNUNET_free (lsocket);
3458 }
3459
3460
3461 /**
3462  * Tries to write the given data to the stream. The maximum size of data that
3463  * can be written per a write operation is ~ 4MB (64 * (64000 - sizeof (struct
3464  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3465  * violation, however only the said number of maximum bytes will be written.
3466  *
3467  * @param socket the socket representing a stream
3468  * @param data the data buffer from where the data is written into the stream
3469  * @param size the number of bytes to be written from the data buffer
3470  * @param timeout the timeout period
3471  * @param write_cont the function to call upon writing some bytes into the
3472  *          stream 
3473  * @param write_cont_cls the closure
3474  *
3475  * @return handle to cancel the operation; if a previous write is pending NULL
3476  *           is returned. If the stream has been shutdown for this operation or
3477  *           is broken then write_cont is immediately called and NULL is
3478  *           returned.
3479  */
3480 struct GNUNET_STREAM_WriteHandle *
3481 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3482                      const void *data,
3483                      size_t size,
3484                      struct GNUNET_TIME_Relative timeout,
3485                      GNUNET_STREAM_CompletionContinuation write_cont,
3486                      void *write_cont_cls)
3487 {
3488   struct GNUNET_STREAM_WriteHandle *io_handle;
3489   struct GNUNET_STREAM_DataMessage *dmsg;
3490   const void *sweep;
3491   struct GNUNET_TIME_Relative ack_deadline;
3492   unsigned int num_needed_packets;
3493   unsigned int cnt;
3494   uint32_t packet_size;
3495   uint32_t payload_size;
3496   uint16_t max_data_packet_size;
3497
3498   LOG (GNUNET_ERROR_TYPE_DEBUG,
3499        "%s\n", __func__);
3500   if (NULL != socket->write_handle)
3501   {
3502     GNUNET_break (0);
3503     return NULL;
3504   }
3505   if (NULL == socket->tunnel)
3506   {
3507     if (NULL != write_cont)
3508       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3509     return NULL;
3510   }
3511   switch (socket->state)
3512   {
3513   case STATE_TRANSMIT_CLOSED:
3514   case STATE_TRANSMIT_CLOSE_WAIT:
3515   case STATE_CLOSED:
3516   case STATE_CLOSE_WAIT:
3517     if (NULL != write_cont)
3518       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3519     LOG (GNUNET_ERROR_TYPE_DEBUG,
3520          "%s() END\n", __func__);
3521     return NULL;
3522   case STATE_INIT:
3523   case STATE_LISTEN:
3524   case STATE_HELLO_WAIT:
3525     if (NULL != write_cont)
3526       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3527     LOG (GNUNET_ERROR_TYPE_DEBUG,
3528          "%s() END\n", __func__);
3529     return NULL;
3530   case STATE_ESTABLISHED:
3531   case STATE_RECEIVE_CLOSED:
3532   case STATE_RECEIVE_CLOSE_WAIT:
3533     break;
3534   }
3535   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3536     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3537   num_needed_packets =
3538       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3539   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_WriteHandle));
3540   io_handle->socket = socket;
3541   io_handle->write_cont = write_cont;
3542   io_handle->write_cont_cls = write_cont_cls;
3543   io_handle->size = size;
3544   io_handle->packets_sent = 0;
3545   sweep = data;
3546   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3547      determined from RTT */
3548   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3549   /* Divide the given buffer into packets for sending */
3550   max_data_packet_size =
3551       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3552   io_handle->max_ack_base_num = socket->write_sequence_number;
3553   for (cnt=0; cnt < num_needed_packets; cnt++)
3554   {
3555     if ((cnt + 1) * socket->max_payload_size < size) 
3556     {
3557       payload_size = socket->max_payload_size;
3558       packet_size = max_data_packet_size;
3559     }
3560     else 
3561     {
3562       payload_size = size - (cnt * socket->max_payload_size);
3563       packet_size = payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3564     }
3565     dmsg = GNUNET_malloc (packet_size);    
3566     dmsg->header.size = htons (packet_size);
3567     dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3568     dmsg->sequence_number = htonl (socket->write_sequence_number++);
3569     dmsg->offset = htonl (socket->write_offset);
3570     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3571        determined from RTT */
3572     dmsg->ack_deadline = GNUNET_TIME_relative_hton (ack_deadline);
3573     /* Copy data from given buffer to the packet */
3574     memcpy (&dmsg[1], sweep, payload_size);
3575     io_handle->messages[cnt] = dmsg;
3576     sweep += payload_size;
3577     socket->write_offset += payload_size;
3578   }
3579   /* ack the last data message. FIXME: remove when we figure out how to do this
3580      using RTT */
3581   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3582       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3583   socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
3584   socket->write_handle = io_handle;
3585   write_data (socket);
3586   LOG (GNUNET_ERROR_TYPE_DEBUG,
3587        "%s() END\n", __func__);
3588   return io_handle;
3589 }
3590
3591
3592 /**
3593  * Function to check the ACK bitmap for any received messages and call the data processor
3594  *
3595  * @param cls the socket
3596  * @param tc the scheduler task context
3597  */
3598 static void
3599 probe_data_availability (void *cls,
3600                          const struct GNUNET_SCHEDULER_TaskContext *tc)
3601 {
3602   struct GNUNET_STREAM_Socket *socket = cls;
3603
3604   GNUNET_assert (NULL != socket->read_handle);
3605   socket->read_handle->probe_data_availability_task_id =
3606       GNUNET_SCHEDULER_NO_TASK;
3607   if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id)
3608     return;     /* A task to call read processor is present */
3609   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3610                                           0))
3611     socket->read_handle->read_task_id 
3612         = GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
3613 }
3614
3615
3616 /**
3617  * Tries to read data from the stream. Should not be called when another read
3618  * handle is present; the existing read handle should be canceled with
3619  * GNUNET_STREAM_read_cancel(). Only one read handle per socket is present at
3620  * any time
3621  *
3622  * @param socket the socket representing a stream
3623  * @param timeout the timeout period
3624  * @param proc function to call with data (once only)
3625  * @param proc_cls the closure for proc
3626  * @return handle to cancel the operation; NULL is returned if the stream has
3627  *           been shutdown for this type of opeartion (the DataProcessor is
3628  *           immediately called with GNUNET_STREAM_SHUTDOWN as status)
3629  */
3630 struct GNUNET_STREAM_ReadHandle *
3631 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3632                     struct GNUNET_TIME_Relative timeout,
3633                     GNUNET_STREAM_DataProcessor proc,
3634                     void *proc_cls)
3635 {
3636   struct GNUNET_STREAM_ReadHandle *read_handle;
3637   
3638   LOG (GNUNET_ERROR_TYPE_DEBUG,
3639        "%s: %s()\n", 
3640        GNUNET_i2s (&socket->other_peer),
3641        __func__);
3642   /* Only one read handle is permitted at any time; cancel the existing or wait
3643      for it to complete */
3644   GNUNET_assert (NULL == socket->read_handle);
3645   GNUNET_assert (NULL != proc);
3646   if (GNUNET_YES == socket->receive_closed)
3647     return NULL;
3648   switch (socket->state)
3649   {
3650   case STATE_RECEIVE_CLOSED:
3651   case STATE_RECEIVE_CLOSE_WAIT:
3652   case STATE_CLOSED:
3653   case STATE_CLOSE_WAIT:
3654     LOG (GNUNET_ERROR_TYPE_DEBUG,
3655          "%s: %s() END\n",
3656          GNUNET_i2s (&socket->other_peer),
3657          __func__);
3658     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3659     return NULL;
3660   default:
3661     break;
3662   }
3663   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ReadHandle));
3664   read_handle->proc = proc;
3665   read_handle->proc_cls = proc_cls;
3666   read_handle->socket = socket;
3667   socket->read_handle = read_handle;
3668   read_handle->probe_data_availability_task_id =
3669       GNUNET_SCHEDULER_add_now (&probe_data_availability, socket);
3670   read_handle->read_io_timeout_task_id =
3671       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3672   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3673        GNUNET_i2s (&socket->other_peer), __func__);
3674   return read_handle;
3675 }
3676
3677
3678 /**
3679  * Cancels pending write operation. Also cancels packet retransmissions which
3680  * may have resulted otherwise.
3681  *
3682  * CAUTION: Normally a write operation is considered successful if the data
3683  * given to it is sent and acknowledged by the receiver. As data is divided
3684  * into packets, it is possible that not all packets are received by the
3685  * receiver. Any missing packets are then retransmitted till the receiver
3686  * acknowledges all packets or until a timeout . During this scenario if the
3687  * write operation is cancelled all such retransmissions are also
3688  * cancelled. This may leave the receiver's receive buffer incompletely filled
3689  * as some missing packets are never retransmitted. So this operation should be
3690  * used before shutting down transmission from our side or before closing the
3691  * socket.
3692  *
3693  * @param wh write operation handle to cancel
3694  */
3695 void
3696 GNUNET_STREAM_write_cancel (struct GNUNET_STREAM_WriteHandle *wh)
3697 {
3698   struct GNUNET_STREAM_Socket *socket = wh->socket;
3699   unsigned int packet;
3700
3701   GNUNET_assert (NULL != socket->write_handle);
3702   GNUNET_assert (socket->write_handle == wh);
3703   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3704   {
3705     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3706     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3707   }
3708   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3709   {
3710     if (NULL == wh->messages[packet]) break;
3711     GNUNET_free (wh->messages[packet]);
3712   }      
3713   GNUNET_free (socket->write_handle);
3714   socket->write_handle = NULL;
3715 }
3716
3717
3718 /**
3719  * Cancel pending read operation.
3720  *
3721  * @param rh read operation handle to cancel
3722  */
3723 void
3724 GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
3725 {
3726   struct GNUNET_STREAM_Socket *socket;
3727   
3728   socket = rh->socket;
3729   GNUNET_assert (NULL != socket->read_handle);
3730   GNUNET_assert (rh == socket->read_handle);
3731   cleanup_read_handle (socket);
3732 }
3733
3734 /* end of stream_api.c */