- simplify
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_util_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_IOWriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_IOReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * Mesh transmit timeout
285    */
286   struct GNUNET_TIME_Relative mesh_retry_timeout;
287
288   /**
289    * Data retransmission timeout
290    */
291   struct GNUNET_TIME_Relative data_retransmit_timeout;
292
293   /**
294    * The state of the protocol associated with this socket
295    */
296   enum State state;
297
298   /**
299    * The status of the socket
300    */
301   enum GNUNET_STREAM_Status status;
302
303   /**
304    * Whether testing mode is active or not
305    */
306   int testing_active;
307
308   /**
309    * Is receive closed
310    */
311   int receive_closed;
312
313   /**
314    * Is transmission closed
315    */
316   int transmit_closed;
317
318   /**
319    * The application port number (type: uint32_t)
320    */
321   GNUNET_MESH_ApplicationType app_port;
322
323   /**
324    * The write sequence number to be set incase of testing
325    */
326   uint32_t testing_set_write_sequence_number_value;
327
328   /**
329    * Write sequence number. Set to random when sending HELLO(client) and
330    * HELLO_ACK(server) 
331    */
332   uint32_t write_sequence_number;
333
334   /**
335    * Read sequence number. This number's value is determined during handshake
336    */
337   uint32_t read_sequence_number;
338
339   /**
340    * The receiver buffer size
341    */
342   uint32_t receive_buffer_size;
343
344   /**
345    * The receiver buffer boundaries
346    */
347   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
348
349   /**
350    * receiver's available buffer after the last acknowledged packet
351    */
352   uint32_t receiver_window_available;
353
354   /**
355    * The offset pointer used during write operation
356    */
357   uint32_t write_offset;
358
359   /**
360    * The offset after which we are expecting data
361    */
362   uint32_t read_offset;
363
364   /**
365    * The offset upto which user has read from the received buffer
366    */
367   uint32_t copy_offset;
368
369   /**
370    * The maximum size of the data message payload this stream handle can send
371    */
372   uint16_t max_payload_size;
373 };
374
375
376 /**
377  * A socket for listening
378  */
379 struct GNUNET_STREAM_ListenSocket
380 {
381   /**
382    * The mesh handle
383    */
384   struct GNUNET_MESH_Handle *mesh;
385
386   /**
387    * Handle to statistics
388    */
389   struct GNUNET_STATISTICS_Handle *stat_handle;
390
391   /**
392    * Our configuration
393    */
394   struct GNUNET_CONFIGURATION_Handle *cfg;
395
396   /**
397    * Handle to the lock manager service
398    */
399   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
400
401   /**
402    * The active LockingRequest from lockmanager
403    */
404   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
405
406   /**
407    * Callback to call after acquring a lock and listening
408    */
409   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
410
411   /**
412    * The callback function which is called after successful opening socket
413    */
414   GNUNET_STREAM_ListenCallback listen_cb;
415
416   /**
417    * The call back closure
418    */
419   void *listen_cb_cls;
420
421   /**
422    * The service port
423    */
424   GNUNET_MESH_ApplicationType port;
425   
426   /**
427    * The id of the lockmanager timeout task
428    */
429   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
430
431   /**
432    * The retransmit timeout
433    */
434   struct GNUNET_TIME_Relative retransmit_timeout;
435   
436   /**
437    * Listen enabled?
438    */
439   int listening;
440
441   /**
442    * Whether testing mode is active or not
443    */
444   int testing_active;
445
446   /**
447    * The write sequence number to be set incase of testing
448    */
449   uint32_t testing_set_write_sequence_number_value;
450
451   /**
452    * The maximum size of the data message payload this stream handle can send
453    */
454   uint16_t max_payload_size;
455
456 };
457
458
459 /**
460  * The IO Write Handle
461  */
462 struct GNUNET_STREAM_IOWriteHandle
463 {
464   /**
465    * The socket to which this write handle is associated
466    */
467   struct GNUNET_STREAM_Socket *socket;
468
469   /**
470    * The packet_buffers associated with this Handle
471    */
472   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
473
474   /**
475    * The write continuation callback
476    */
477   GNUNET_STREAM_CompletionContinuation write_cont;
478
479   /**
480    * Write continuation closure
481    */
482   void *write_cont_cls;
483
484   /**
485    * The bitmap of this IOHandle; Corresponding bit for a message is set when
486    * it has been acknowledged by the receiver
487    */
488   GNUNET_STREAM_AckBitmap ack_bitmap;
489
490   /**
491    * Number of bytes in this write handle
492    */
493   size_t size;
494
495   /**
496    * Number of packets already transmitted from this IO handle. Retransmitted
497    * packets are not taken into account here. This is used to determine which
498    * packets account for retransmission and which packets occupy buffer space at
499    * the receiver.
500    */
501   unsigned int packets_sent;
502
503   /**
504    * The maximum of the base numbers of the received acks
505    */
506   uint32_t max_ack_base_num;
507
508 };
509
510
511 /**
512  * The IO Read Handle
513  */
514 struct GNUNET_STREAM_IOReadHandle
515 {
516   /**
517    * The socket to which this read handle is associated
518    */
519   struct GNUNET_STREAM_Socket *socket;
520   
521   /**
522    * Callback for the read processor
523    */
524   GNUNET_STREAM_DataProcessor proc;
525
526   /**
527    * The closure pointer for the read processor callback
528    */
529   void *proc_cls;
530
531   /**
532    * Task identifier for the read io timeout task
533    */
534   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
535
536   /**
537    * Task scheduled to continue a read operation.
538    */
539   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
540 };
541
542
543 /**
544  * Handle for Shutdown
545  */
546 struct GNUNET_STREAM_ShutdownHandle
547 {
548   /**
549    * The socket associated with this shutdown handle
550    */
551   struct GNUNET_STREAM_Socket *socket;
552
553   /**
554    * Shutdown completion callback
555    */
556   GNUNET_STREAM_ShutdownCompletion completion_cb;
557
558   /**
559    * Closure for completion callback
560    */
561   void *completion_cls;
562
563   /**
564    * Close message retransmission task id
565    */
566   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
567
568   /**
569    * Task scheduled to call the shutdown continuation callback
570    */
571   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
572
573   /**
574    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
575    */
576   int operation;  
577 };
578
579
580 /**
581  * Default value in seconds for various timeouts
582  */
583 static const unsigned int default_timeout = 10;
584
585 /**
586  * The domain name for locks we use here
587  */
588 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
589
590 /**
591  * Callback function for sending queued message
592  *
593  * @param cls closure the socket
594  * @param size number of bytes available in buf
595  * @param buf where the callee should write the message
596  * @return number of bytes written to buf
597  */
598 static size_t
599 send_message_notify (void *cls, size_t size, void *buf)
600 {
601   struct GNUNET_STREAM_Socket *socket = cls;
602   struct MessageQueue *head;
603   size_t ret;
604
605   socket->transmit_handle = NULL; /* Remove the transmit handle */
606   head = socket->queue_head;
607   if (NULL == head)
608     return 0; /* just to be safe */
609   if (0 == size)                /* request timed out */
610   {
611     socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
612         (socket->mesh_retry_timeout);    
613     LOG (GNUNET_ERROR_TYPE_DEBUG,
614          "%s: Message sending to MESH timed out. Retrying in %s \n",
615          GNUNET_i2s (&socket->other_peer),
616          GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
617                                                  GNUNET_YES));
618     socket->transmit_handle = 
619         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
620                                            GNUNET_NO, /* Corking */
621                                            socket->mesh_retry_timeout,
622                                            &socket->other_peer,
623                                            ntohs (head->message->header.size),
624                                            &send_message_notify,
625                                            socket);
626     return 0;
627   }
628   ret = ntohs (head->message->header.size);
629   GNUNET_assert (size >= ret);
630   memcpy (buf, head->message, ret);
631   if (NULL != head->finish_cb)
632   {
633     head->finish_cb (head->finish_cb_cls, socket);
634   }
635   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
636                                socket->queue_tail,
637                                head);
638   GNUNET_free (head->message);
639   GNUNET_free (head);
640   if (NULL != socket->transmit_handle)
641     return ret; /* 'finish_cb' might have triggered message already! */
642   head = socket->queue_head;
643   if (NULL != head)    /* more pending messages to send */
644   {
645     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
646     socket->transmit_handle = 
647         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
648                                            GNUNET_NO, /* Corking */
649                                            socket->mesh_retry_timeout,
650                                            &socket->other_peer,
651                                            ntohs (head->message->header.size),
652                                            &send_message_notify,
653                                            socket);
654   }
655   return ret;
656 }
657
658
659 /**
660  * Queues a message for sending using the mesh connection of a socket
661  *
662  * @param socket the socket whose mesh connection is used
663  * @param message the message to be sent
664  * @param finish_cb the callback to be called when the message is sent
665  * @param finish_cb_cls the closure for the callback
666  * @param urgent set to GNUNET_YES to add the message to the beginning of the
667  *          queue; GNUNET_NO to add at the tail
668  */
669 static void
670 queue_message (struct GNUNET_STREAM_Socket *socket,
671                struct GNUNET_STREAM_MessageHeader *message,
672                SendFinishCallback finish_cb,
673                void *finish_cb_cls,
674                int urgent)
675 {
676   struct MessageQueue *queue_entity;
677
678   GNUNET_assert 
679     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
680      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
681   LOG (GNUNET_ERROR_TYPE_DEBUG,
682        "%s: Queueing message of type %d and size %d\n",
683        GNUNET_i2s (&socket->other_peer),
684        ntohs (message->header.type),
685        ntohs (message->header.size));
686   GNUNET_assert (NULL != message);
687   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
688   queue_entity->message = message;
689   queue_entity->finish_cb = finish_cb;
690   queue_entity->finish_cb_cls = finish_cb_cls;
691   if (GNUNET_YES == urgent)
692   {
693     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
694                                  queue_entity);
695     if (NULL != socket->transmit_handle)
696     {
697       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
698       socket->transmit_handle = NULL;
699     }
700   }
701   else
702     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
703                                       socket->queue_tail,
704                                       queue_entity);
705   if (NULL == socket->transmit_handle)
706   {
707     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
708     socket->transmit_handle = 
709         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
710                                            GNUNET_NO, /* Corking */
711                                            socket->mesh_retry_timeout,
712                                            &socket->other_peer,
713                                            ntohs (message->header.size),
714                                            &send_message_notify,
715                                            socket);
716   }
717 }
718
719
720 /**
721  * Copies a message and queues it for sending using the mesh connection of
722  * given socket 
723  *
724  * @param socket the socket whose mesh connection is used
725  * @param message the message to be sent
726  * @param finish_cb the callback to be called when the message is sent
727  * @param finish_cb_cls the closure for the callback
728  */
729 static void
730 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
731                         const struct GNUNET_STREAM_MessageHeader *message,
732                         SendFinishCallback finish_cb,
733                         void *finish_cb_cls)
734 {
735   struct GNUNET_STREAM_MessageHeader *msg_copy;
736   uint16_t size;
737   
738   size = ntohs (message->header.size);
739   msg_copy = GNUNET_malloc (size);
740   memcpy (msg_copy, message, size);
741   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
742 }
743
744
745 /**
746  * Writes data using the given socket. The amount of data written is limited by
747  * the receiver_window_size
748  *
749  * @param socket the socket to use
750  */
751 static void 
752 write_data (struct GNUNET_STREAM_Socket *socket);
753
754
755 /**
756  * Task for retransmitting data messages if they aren't ACK before their ack
757  * deadline 
758  *
759  * @param cls the socket
760  * @param tc the Task context
761  */
762 static void
763 data_retransmission_task (void *cls,
764                           const struct GNUNET_SCHEDULER_TaskContext *tc)
765 {
766   struct GNUNET_STREAM_Socket *socket = cls;
767   
768   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
769   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
770     return;
771   LOG (GNUNET_ERROR_TYPE_DEBUG,
772        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
773   write_data (socket);
774 }
775
776
777 /**
778  * Task for sending ACK message
779  *
780  * @param cls the socket
781  * @param tc the Task context
782  */
783 static void
784 ack_task (void *cls,
785           const struct GNUNET_SCHEDULER_TaskContext *tc)
786 {
787   struct GNUNET_STREAM_Socket *socket = cls;
788   struct GNUNET_STREAM_AckMessage *ack_msg;
789
790   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
791   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
792     return;
793   /* Create the ACK Message */
794   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
795   ack_msg->header.header.size = htons (sizeof (struct 
796                                                GNUNET_STREAM_AckMessage));
797   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
798   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
799   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
800   ack_msg->receive_window_remaining = 
801     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
802   /* Queue up ACK for immediate sending */
803   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
804 }
805
806
807 /**
808  * Retransmission task for shutdown messages
809  *
810  * @param cls the shutdown handle
811  * @param tc the Task Context
812  */
813 static void
814 close_msg_retransmission_task (void *cls,
815                                const struct GNUNET_SCHEDULER_TaskContext *tc)
816 {
817   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
818   struct GNUNET_STREAM_MessageHeader *msg;
819   struct GNUNET_STREAM_Socket *socket;
820
821   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
822   GNUNET_assert (NULL != shutdown_handle);
823   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
824     return;
825   socket = shutdown_handle->socket;
826   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
827   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
828   switch (shutdown_handle->operation)
829   {
830   case SHUT_RDWR:
831     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
832     break;
833   case SHUT_RD:
834     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
835     break;
836   case SHUT_WR:
837     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
838     break;
839   default:
840     GNUNET_free (msg);
841     shutdown_handle->close_msg_retransmission_task_id = 
842       GNUNET_SCHEDULER_NO_TASK;
843     return;
844   }
845   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
846   shutdown_handle->close_msg_retransmission_task_id =
847     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
848                                   &close_msg_retransmission_task,
849                                   shutdown_handle);
850 }
851
852
853 /**
854  * Function to modify a bit in GNUNET_STREAM_AckBitmap
855  *
856  * @param bitmap the bitmap to modify
857  * @param bit the bit number to modify
858  * @param value GNUNET_YES to on, GNUNET_NO to off
859  */
860 static void
861 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
862                       unsigned int bit, 
863                       int value)
864 {
865   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
866   if (GNUNET_YES == value)
867     *bitmap |= (1LL << bit);
868   else
869     *bitmap &= ~(1LL << bit);
870 }
871
872
873 /**
874  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
875  *
876  * @param bitmap address of the bitmap that has to be checked
877  * @param bit the bit number to check
878  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
879  */
880 static uint8_t
881 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
882                       unsigned int bit)
883 {
884   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
885   return 0 != (*bitmap & (1LL << bit));
886 }
887
888
889 /**
890  * Writes data using the given socket. The amount of data written is limited by
891  * the receiver_window_size
892  *
893  * @param socket the socket to use
894  */
895 static void 
896 write_data (struct GNUNET_STREAM_Socket *socket)
897 {
898   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
899   unsigned int packet;
900   
901   for (packet=0; packet < io_handle->packets_sent; packet++)
902   {
903     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
904                                            packet))
905     {
906       LOG (GNUNET_ERROR_TYPE_DEBUG,
907            "%s: Retransmitting DATA message with sequence %u\n",
908            GNUNET_i2s (&socket->other_peer),
909            ntohl (io_handle->messages[packet]->sequence_number));
910       copy_and_queue_message (socket,
911                               &io_handle->messages[packet]->header,
912                               NULL,
913                               NULL);
914     }
915   }
916   /* Now send new packets if there is enough buffer space */
917   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
918          (NULL != io_handle->messages[packet]) &&
919          (socket->receiver_window_available 
920           >= ntohs (io_handle->messages[packet]->header.header.size)))
921   {
922     socket->receiver_window_available -= 
923       ntohs (io_handle->messages[packet]->header.header.size);
924     LOG (GNUNET_ERROR_TYPE_DEBUG,
925          "%s: Placing DATA message with sequence %u in send queue\n",
926          GNUNET_i2s (&socket->other_peer),
927          ntohl (io_handle->messages[packet]->sequence_number));
928     copy_and_queue_message (socket,
929                             &io_handle->messages[packet]->header,
930                             NULL,
931                             NULL);
932     packet++;
933   }
934   io_handle->packets_sent = packet;
935   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
936   {
937     socket->data_retransmit_timeout = GNUNET_TIME_STD_BACKOFF
938         (socket->data_retransmit_timeout);
939     socket->data_retransmission_task_id = 
940         GNUNET_SCHEDULER_add_delayed (socket->data_retransmit_timeout,
941                                       &data_retransmission_task,
942                                       socket);
943   }
944 }
945
946
947 /**
948  * Task for calling the read processor
949  *
950  * @param cls the socket
951  * @param tc the task context
952  */
953 static void
954 call_read_processor (void *cls,
955                      const struct GNUNET_SCHEDULER_TaskContext *tc)
956 {
957   struct GNUNET_STREAM_Socket *socket = cls;
958   struct GNUNET_STREAM_IOReadHandle *read_handle;
959   size_t read_size;
960   size_t valid_read_size;
961   unsigned int packet;
962   uint32_t sequence_increase;
963   uint32_t offset_increase;
964
965   read_handle = socket->read_handle;
966   GNUNET_assert (NULL != read_handle);
967   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
968   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
969     return;
970   if (NULL == socket->receive_buffer) 
971     return;
972   GNUNET_assert (NULL != socket->read_handle);
973   GNUNET_assert (NULL != socket->read_handle->proc);
974   /* Check the bitmap for any holes */
975   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
976   {
977     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
978                                            packet))
979       break;
980   }
981   /* We only call read processor if we have the first packet */
982   GNUNET_assert (0 < packet);
983   valid_read_size = 
984     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
985   GNUNET_assert (0 != valid_read_size);
986   /* Cancel the read_io_timeout_task */
987   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
988   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
989   /* Call the data processor */
990   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
991        GNUNET_i2s (&socket->other_peer));
992   read_size = 
993       socket->read_handle->proc (socket->read_handle->proc_cls,
994                                  socket->status,
995                                  socket->receive_buffer + socket->copy_offset,
996                                  valid_read_size);
997   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
998        GNUNET_i2s (&socket->other_peer), read_size);
999   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
1000        GNUNET_i2s (&socket->other_peer));
1001   /* Free the read handle */
1002   GNUNET_free (socket->read_handle);
1003   socket->read_handle = NULL;
1004   GNUNET_assert (read_size <= valid_read_size);
1005   socket->copy_offset += read_size;
1006   /* Determine upto which packet we can remove from the buffer */
1007   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1008   {
1009     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1010     { 
1011       packet++; 
1012       break;
1013     }
1014     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1015       break;
1016   }
1017   /* If no packets can be removed we can't move the buffer */
1018   if (0 == packet) 
1019     return;
1020   sequence_increase = packet;
1021   LOG (GNUNET_ERROR_TYPE_DEBUG,
1022        "%s: Sequence increase after read processor completion: %u\n",
1023        GNUNET_i2s (&socket->other_peer), sequence_increase);
1024   /* Shift the data in the receive buffer */
1025   socket->receive_buffer = 
1026     memmove (socket->receive_buffer,
1027              socket->receive_buffer 
1028              + socket->receive_buffer_boundaries[sequence_increase-1],
1029              socket->receive_buffer_size
1030              - socket->receive_buffer_boundaries[sequence_increase-1]);
1031   /* Shift the bitmap */
1032   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1033   /* Set read_sequence_number */
1034   socket->read_sequence_number += sequence_increase;
1035   /* Set read_offset */
1036   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1037   socket->read_offset += offset_increase;
1038   /* Fix copy_offset */
1039   GNUNET_assert (offset_increase <= socket->copy_offset);
1040   socket->copy_offset -= offset_increase;
1041   /* Fix relative boundaries */
1042   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1043   {
1044     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1045     {
1046       uint32_t ahead_buffer_boundary;
1047
1048       ahead_buffer_boundary = 
1049         socket->receive_buffer_boundaries[packet + sequence_increase];
1050       if (0 == ahead_buffer_boundary)
1051         socket->receive_buffer_boundaries[packet] = 0;
1052       else
1053       {
1054         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1055         socket->receive_buffer_boundaries[packet] = 
1056           ahead_buffer_boundary - offset_increase;
1057       }
1058     }
1059     else
1060       socket->receive_buffer_boundaries[packet] = 0;
1061   }
1062 }
1063
1064
1065 /**
1066  * Cancels the existing read io handle
1067  *
1068  * @param cls the closure from the SCHEDULER call
1069  * @param tc the task context
1070  */
1071 static void
1072 read_io_timeout (void *cls,
1073                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1074 {
1075   struct GNUNET_STREAM_Socket *socket = cls;
1076   struct GNUNET_STREAM_IOReadHandle *read_handle;
1077   GNUNET_STREAM_DataProcessor proc;
1078   void *proc_cls;
1079
1080   read_handle = socket->read_handle;
1081   GNUNET_assert (NULL != read_handle);
1082   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1083   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1084     return;
1085   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1086   {
1087     LOG (GNUNET_ERROR_TYPE_DEBUG,
1088          "%s: Read task timedout - Cancelling it\n",
1089          GNUNET_i2s (&socket->other_peer));
1090     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1091     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1092   }
1093   proc = read_handle->proc;
1094   proc_cls = read_handle->proc_cls;
1095   GNUNET_free (read_handle);
1096   socket->read_handle = NULL;
1097   /* Call the read processor to signal timeout */
1098   proc (proc_cls,
1099         GNUNET_STREAM_TIMEOUT,
1100         NULL,
1101         0);
1102 }
1103
1104
1105 /**
1106  * Handler for DATA messages; Same for both client and server
1107  *
1108  * @param socket the socket through which the ack was received
1109  * @param tunnel connection to the other end
1110  * @param sender who sent the message
1111  * @param msg the data message
1112  * @param atsi performance data for the connection
1113  * @return GNUNET_OK to keep the connection open,
1114  *         GNUNET_SYSERR to close it (signal serious error)
1115  */
1116 static int
1117 handle_data (struct GNUNET_STREAM_Socket *socket,
1118              struct GNUNET_MESH_Tunnel *tunnel,
1119              const struct GNUNET_PeerIdentity *sender,
1120              const struct GNUNET_STREAM_DataMessage *msg,
1121              const struct GNUNET_ATS_Information*atsi)
1122 {
1123   const void *payload;
1124   struct GNUNET_TIME_Relative ack_deadline_rel;
1125   uint32_t bytes_needed;
1126   uint32_t relative_offset;
1127   uint32_t relative_sequence_number;
1128   uint16_t size;
1129
1130   size = htons (msg->header.header.size);
1131   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1132   {
1133     GNUNET_break_op (0);
1134     return GNUNET_SYSERR;
1135   }
1136   if (0 != memcmp (sender, &socket->other_peer,
1137                    sizeof (struct GNUNET_PeerIdentity)))
1138   {
1139     LOG (GNUNET_ERROR_TYPE_DEBUG,
1140          "%s: Received DATA from non-confirming peer\n",
1141          GNUNET_i2s (&socket->other_peer));
1142     return GNUNET_YES;
1143   }
1144   switch (socket->state)
1145   {
1146   case STATE_ESTABLISHED:
1147   case STATE_TRANSMIT_CLOSED:
1148   case STATE_TRANSMIT_CLOSE_WAIT:      
1149     /* check if the message's sequence number is in the range we are
1150        expecting */
1151     relative_sequence_number = 
1152       ntohl (msg->sequence_number) - socket->read_sequence_number;
1153     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1154     {
1155       LOG (GNUNET_ERROR_TYPE_DEBUG,
1156            "%s: Ignoring received message with sequence number %u\n",
1157            GNUNET_i2s (&socket->other_peer),
1158            ntohl (msg->sequence_number));
1159       /* Start ACK sending task if one is not already present */
1160       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1161       {
1162         socket->ack_task_id = 
1163           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1164                                         (msg->ack_deadline),
1165                                         &ack_task,
1166                                         socket);
1167       }
1168       return GNUNET_YES;
1169     }      
1170     /* Check if we have already seen this message */
1171     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1172                                             relative_sequence_number))
1173     {
1174       LOG (GNUNET_ERROR_TYPE_DEBUG,
1175            "%s: Ignoring already received message with sequence number %u\n",
1176            GNUNET_i2s (&socket->other_peer),
1177            ntohl (msg->sequence_number));
1178       /* Start ACK sending task if one is not already present */
1179       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1180       {
1181         socket->ack_task_id = 
1182           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1183                                         (msg->ack_deadline), &ack_task, socket);
1184       }
1185       return GNUNET_YES;
1186     }
1187     LOG (GNUNET_ERROR_TYPE_DEBUG,
1188          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1189          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1190          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1191     /* Check if we have to allocate the buffer */
1192     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1193     relative_offset = ntohl (msg->offset) - socket->read_offset;
1194     bytes_needed = relative_offset + size;
1195     if (bytes_needed > socket->receive_buffer_size)
1196     {
1197       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1198       {
1199         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1200                                                  bytes_needed);
1201         socket->receive_buffer_size = bytes_needed;
1202       }
1203       else
1204       {
1205         LOG (GNUNET_ERROR_TYPE_DEBUG,
1206              "%s: Cannot accommodate packet %d as buffer is full\n",
1207              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1208         return GNUNET_YES;
1209       }
1210     }
1211     /* Copy Data to buffer */
1212     payload = &msg[1];
1213     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1214     memcpy (socket->receive_buffer + relative_offset, payload, size);
1215     socket->receive_buffer_boundaries[relative_sequence_number] = 
1216         relative_offset + size;
1217     /* Modify the ACK bitmap */
1218     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1219                           GNUNET_YES);
1220     /* Start ACK sending task if one is not already present */
1221     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1222     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1223     {
1224       ack_deadline_rel = 
1225           GNUNET_TIME_relative_min (ack_deadline_rel,
1226                                     GNUNET_TIME_relative_multiply
1227                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1228       socket->ack_task_id = 
1229           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1230                                         (msg->ack_deadline), &ack_task, socket);
1231       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1232       socket->ack_time_deadline = ack_deadline_rel;
1233     }
1234     else
1235     {
1236       struct GNUNET_TIME_Relative ack_time_past;
1237       struct GNUNET_TIME_Relative ack_time_remaining;
1238       struct GNUNET_TIME_Relative ack_time_min;
1239       ack_time_past = 
1240           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1241       ack_time_remaining = GNUNET_TIME_relative_subtract
1242           (socket->ack_time_deadline, ack_time_past);
1243       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1244                                                ack_deadline_rel);
1245       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1246                       sizeof (struct GNUNET_TIME_Relative)))
1247       {
1248         ack_deadline_rel = ack_time_min;
1249         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1250         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1251                                                             &ack_task, socket);
1252         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1253         socket->ack_time_deadline = ack_deadline_rel;
1254       }
1255     }
1256     if ((NULL != socket->read_handle) /* A read handle is waiting */
1257         /* There is no current read task */
1258         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1259         /* We have the first packet */
1260         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1261     {
1262       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1263            GNUNET_i2s (&socket->other_peer));
1264       socket->read_handle->read_task_id =
1265           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1266     }
1267     break;
1268   default:
1269     LOG (GNUNET_ERROR_TYPE_DEBUG,
1270          "%s: Received data message when it cannot be handled\n",
1271          GNUNET_i2s (&socket->other_peer));
1272     break;
1273   }
1274   return GNUNET_YES;
1275 }
1276
1277
1278 /**
1279  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1280  *
1281  * @param cls the socket (set from GNUNET_MESH_connect)
1282  * @param tunnel connection to the other end
1283  * @param tunnel_ctx place to store local state associated with the tunnel
1284  * @param sender who sent the message
1285  * @param message the actual message
1286  * @param atsi performance data for the connection
1287  * @return GNUNET_OK to keep the connection open,
1288  *         GNUNET_SYSERR to close it (signal serious error)
1289  */
1290 static int
1291 client_handle_data (void *cls,
1292                     struct GNUNET_MESH_Tunnel *tunnel,
1293                     void **tunnel_ctx,
1294                     const struct GNUNET_PeerIdentity *sender,
1295                     const struct GNUNET_MessageHeader *message,
1296                     const struct GNUNET_ATS_Information*atsi)
1297 {
1298   struct GNUNET_STREAM_Socket *socket = cls;
1299
1300   return handle_data (socket, tunnel, sender, 
1301                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1302 }
1303
1304
1305 /**
1306  * Callback to set state to ESTABLISHED
1307  *
1308  * @param cls the closure NULL;
1309  * @param socket the socket to requiring state change
1310  */
1311 static void
1312 set_state_established (void *cls,
1313                        struct GNUNET_STREAM_Socket *socket)
1314 {
1315   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1316        "%s: Attaining ESTABLISHED state\n",
1317        GNUNET_i2s (&socket->other_peer));
1318   socket->write_offset = 0;
1319   socket->read_offset = 0;
1320   socket->state = STATE_ESTABLISHED;
1321   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1322                  socket->control_retransmission_task_id);
1323   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1324   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1325   if (NULL != socket->lsocket)
1326   {
1327     LOG (GNUNET_ERROR_TYPE_DEBUG,
1328          "%s: Calling listen callback\n",
1329          GNUNET_i2s (&socket->other_peer));
1330     if (GNUNET_SYSERR == 
1331         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1332                                     socket,
1333                                     &socket->other_peer))
1334     {
1335       socket->state = STATE_CLOSED;
1336       /* FIXME: We should close in a decent way (send RST) */
1337       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1338       GNUNET_free (socket);
1339     }
1340   }
1341   else
1342     socket->open_cb (socket->open_cls, socket);
1343 }
1344
1345
1346 /**
1347  * Callback to set state to HELLO_WAIT
1348  *
1349  * @param cls the closure from queue_message
1350  * @param socket the socket to requiring state change
1351  */
1352 static void
1353 set_state_hello_wait (void *cls,
1354                       struct GNUNET_STREAM_Socket *socket)
1355 {
1356   GNUNET_assert (STATE_INIT == socket->state);
1357   LOG (GNUNET_ERROR_TYPE_DEBUG,
1358        "%s: Attaining HELLO_WAIT state\n",
1359        GNUNET_i2s (&socket->other_peer));
1360   socket->state = STATE_HELLO_WAIT;
1361 }
1362
1363
1364 /**
1365  * Callback to set state to CLOSE_WAIT
1366  *
1367  * @param cls the closure from queue_message
1368  * @param socket the socket requiring state change
1369  */
1370 static void
1371 set_state_close_wait (void *cls,
1372                       struct GNUNET_STREAM_Socket *socket)
1373 {
1374   LOG (GNUNET_ERROR_TYPE_DEBUG,
1375        "%s: Attaing CLOSE_WAIT state\n",
1376        GNUNET_i2s (&socket->other_peer));
1377   socket->state = STATE_CLOSE_WAIT;
1378   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1379   socket->receive_buffer = NULL;
1380   socket->receive_buffer_size = 0;
1381 }
1382
1383
1384 /**
1385  * Callback to set state to RECEIVE_CLOSE_WAIT
1386  *
1387  * @param cls the closure from queue_message
1388  * @param socket the socket requiring state change
1389  */
1390 static void
1391 set_state_receive_close_wait (void *cls,
1392                               struct GNUNET_STREAM_Socket *socket)
1393 {
1394   LOG (GNUNET_ERROR_TYPE_DEBUG,
1395        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1396        GNUNET_i2s (&socket->other_peer));
1397   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1398   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1399   socket->receive_buffer = NULL;
1400   socket->receive_buffer_size = 0;
1401 }
1402
1403
1404 /**
1405  * Callback to set state to TRANSMIT_CLOSE_WAIT
1406  *
1407  * @param cls the closure from queue_message
1408  * @param socket the socket requiring state change
1409  */
1410 static void
1411 set_state_transmit_close_wait (void *cls,
1412                                struct GNUNET_STREAM_Socket *socket)
1413 {
1414   LOG (GNUNET_ERROR_TYPE_DEBUG,
1415        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1416        GNUNET_i2s (&socket->other_peer));
1417   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1418 }
1419
1420
1421 /**
1422  * Callback to set state to CLOSED
1423  *
1424  * @param cls the closure from queue_message
1425  * @param socket the socket requiring state change
1426  */
1427 static void
1428 set_state_closed (void *cls,
1429                   struct GNUNET_STREAM_Socket *socket)
1430 {
1431   socket->state = STATE_CLOSED;
1432 }
1433
1434
1435 /**
1436  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1437  *
1438  * @return the generate hello message
1439  */
1440 static struct GNUNET_STREAM_MessageHeader *
1441 generate_hello (void)
1442 {
1443   struct GNUNET_STREAM_MessageHeader *msg;
1444
1445   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1446   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1447   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1448   return msg;
1449 }
1450
1451
1452 /**
1453  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1454  * socket
1455  *
1456  * @param socket the socket for which this HelloAckMessage has to be generated
1457  * @param generate_seq GNUNET_YES to generate the write sequence number,
1458  *          GNUNET_NO to use the existing sequence number
1459  * @return the HelloAckMessage
1460  */
1461 static struct GNUNET_STREAM_HelloAckMessage *
1462 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1463                     int generate_seq)
1464 {
1465   struct GNUNET_STREAM_HelloAckMessage *msg;
1466
1467   if (GNUNET_YES == generate_seq)
1468   {
1469     if (GNUNET_YES == socket->testing_active)
1470       socket->write_sequence_number =
1471         socket->testing_set_write_sequence_number_value;
1472     else
1473       socket->write_sequence_number = 
1474         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1475     LOG_DEBUG ("%s: write sequence number %u\n",
1476                GNUNET_i2s (&socket->other_peer),
1477                (unsigned int) socket->write_sequence_number);
1478   }
1479   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1480   msg->header.header.size = 
1481     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1482   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1483   msg->sequence_number = htonl (socket->write_sequence_number);
1484   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1485   return msg;
1486 }
1487
1488
1489 /**
1490  * Task for retransmitting control messages if they aren't ACK'ed before a
1491  * deadline
1492  *
1493  * @param cls the socket
1494  * @param tc the Task context
1495  */
1496 static void
1497 control_retransmission_task (void *cls,
1498                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1499 {
1500   struct GNUNET_STREAM_Socket *socket = cls;
1501     
1502   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1503   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1504     return;
1505   LOG_DEBUG ("%s: Retransmitting a control message\n",
1506                  GNUNET_i2s (&socket->other_peer));
1507   switch (socket->state)
1508   {
1509   case STATE_INIT:    
1510     GNUNET_break (0);
1511     break;
1512   case STATE_LISTEN:
1513     GNUNET_break (0);
1514     break;
1515   case STATE_HELLO_WAIT:
1516     if (NULL == socket->lsocket) /* We are client */
1517       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1518     else
1519       queue_message (socket,
1520                      (struct GNUNET_STREAM_MessageHeader *)
1521                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1522                      GNUNET_NO);
1523     socket->control_retransmission_task_id =
1524     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1525                                   &control_retransmission_task, socket);
1526     break;
1527   case STATE_ESTABLISHED:
1528     if (NULL == socket->lsocket)
1529       queue_message (socket,
1530                      (struct GNUNET_STREAM_MessageHeader *)
1531                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1532                      GNUNET_NO);
1533     else
1534       GNUNET_break (0);
1535     break;
1536   default:
1537     GNUNET_break (0);
1538   }  
1539 }
1540
1541
1542 /**
1543  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1544  *
1545  * @param cls the socket (set from GNUNET_MESH_connect)
1546  * @param tunnel connection to the other end
1547  * @param tunnel_ctx this is NULL
1548  * @param sender who sent the message
1549  * @param message the actual message
1550  * @param atsi performance data for the connection
1551  * @return GNUNET_OK to keep the connection open,
1552  *         GNUNET_SYSERR to close it (signal serious error)
1553  */
1554 static int
1555 client_handle_hello_ack (void *cls,
1556                          struct GNUNET_MESH_Tunnel *tunnel,
1557                          void **tunnel_ctx,
1558                          const struct GNUNET_PeerIdentity *sender,
1559                          const struct GNUNET_MessageHeader *message,
1560                          const struct GNUNET_ATS_Information*atsi)
1561 {
1562   struct GNUNET_STREAM_Socket *socket = cls;
1563   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1564   struct GNUNET_STREAM_HelloAckMessage *reply;
1565
1566   if (0 != memcmp (sender, &socket->other_peer,
1567                    sizeof (struct GNUNET_PeerIdentity)))
1568   {
1569     LOG (GNUNET_ERROR_TYPE_DEBUG,
1570          "%s: Received HELLO_ACK from non-confirming peer\n",
1571          GNUNET_i2s (&socket->other_peer));
1572     return GNUNET_YES;
1573   }
1574   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1575   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1576        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1577   GNUNET_assert (socket->tunnel == tunnel);
1578   switch (socket->state)
1579   {
1580   case STATE_HELLO_WAIT:
1581     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1582     LOG (GNUNET_ERROR_TYPE_DEBUG,
1583          "%s: Read sequence number %u\n",
1584          GNUNET_i2s (&socket->other_peer),
1585          (unsigned int) socket->read_sequence_number);
1586     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1587     reply = generate_hello_ack (socket, GNUNET_YES);
1588     queue_message (socket, &reply->header, &set_state_established,
1589                    NULL, GNUNET_NO);    
1590     return GNUNET_OK;
1591   case STATE_ESTABLISHED:
1592     // call statistics (# ACKs ignored++)
1593     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1594                    socket->control_retransmission_task_id);
1595     socket->control_retransmission_task_id =
1596       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1597     return GNUNET_OK;
1598   default:
1599     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1600                GNUNET_i2s (&socket->other_peer),
1601                GNUNET_i2s (&socket->other_peer), socket->state);
1602     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1603     return GNUNET_SYSERR;
1604   }
1605 }
1606
1607
1608 /**
1609  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1610  *
1611  * @param cls the socket (set from GNUNET_MESH_connect)
1612  * @param tunnel connection to the other end
1613  * @param tunnel_ctx this is NULL
1614  * @param sender who sent the message
1615  * @param message the actual message
1616  * @param atsi performance data for the connection
1617  * @return GNUNET_OK to keep the connection open,
1618  *         GNUNET_SYSERR to close it (signal serious error)
1619  */
1620 static int
1621 client_handle_reset (void *cls,
1622                      struct GNUNET_MESH_Tunnel *tunnel,
1623                      void **tunnel_ctx,
1624                      const struct GNUNET_PeerIdentity *sender,
1625                      const struct GNUNET_MessageHeader *message,
1626                      const struct GNUNET_ATS_Information*atsi)
1627 {
1628   // struct GNUNET_STREAM_Socket *socket = cls;
1629
1630   return GNUNET_OK;
1631 }
1632
1633
1634 /**
1635  * Common message handler for handling TRANSMIT_CLOSE messages
1636  *
1637  * @param socket the socket through which the ack was received
1638  * @param tunnel connection to the other end
1639  * @param sender who sent the message
1640  * @param msg the transmit close message
1641  * @param atsi performance data for the connection
1642  * @return GNUNET_OK to keep the connection open,
1643  *         GNUNET_SYSERR to close it (signal serious error)
1644  */
1645 static int
1646 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1647                        struct GNUNET_MESH_Tunnel *tunnel,
1648                        const struct GNUNET_PeerIdentity *sender,
1649                        const struct GNUNET_STREAM_MessageHeader *msg,
1650                        const struct GNUNET_ATS_Information*atsi)
1651 {
1652   struct GNUNET_STREAM_MessageHeader *reply;
1653
1654   switch (socket->state)
1655   {
1656   case STATE_INIT:
1657   case STATE_LISTEN:
1658   case STATE_HELLO_WAIT:
1659     LOG (GNUNET_ERROR_TYPE_DEBUG,
1660          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1661          GNUNET_i2s (&socket->other_peer));
1662     return GNUNET_OK;
1663   default:
1664     break;
1665   }
1666   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1667        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1668   socket->receive_closed = GNUNET_YES;
1669   if (GNUNET_YES == socket->transmit_closed)
1670     socket->state = STATE_CLOSED;
1671   else
1672     socket->state = STATE_RECEIVE_CLOSED;
1673   /* Send TRANSMIT_CLOSE_ACK */
1674   reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1675   reply->header.type = 
1676       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1677   reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1678   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1679   return GNUNET_OK;
1680 }
1681
1682
1683 /**
1684  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1685  *
1686  * @param cls the socket (set from GNUNET_MESH_connect)
1687  * @param tunnel connection to the other end
1688  * @param tunnel_ctx this is NULL
1689  * @param sender who sent the message
1690  * @param message the actual message
1691  * @param atsi performance data for the connection
1692  * @return GNUNET_OK to keep the connection open,
1693  *         GNUNET_SYSERR to close it (signal serious error)
1694  */
1695 static int
1696 client_handle_transmit_close (void *cls,
1697                               struct GNUNET_MESH_Tunnel *tunnel,
1698                               void **tunnel_ctx,
1699                               const struct GNUNET_PeerIdentity *sender,
1700                               const struct GNUNET_MessageHeader *message,
1701                               const struct GNUNET_ATS_Information*atsi)
1702 {
1703   struct GNUNET_STREAM_Socket *socket = cls;
1704   
1705   return handle_transmit_close (socket,
1706                                 tunnel,
1707                                 sender,
1708                                 (struct GNUNET_STREAM_MessageHeader *)message,
1709                                 atsi);
1710 }
1711
1712
1713 /**
1714  * Task for calling the shutdown continuation callback
1715  *
1716  * @param cls the socket
1717  * @param tc the scheduler task context
1718  */
1719 static void
1720 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1721 {
1722   struct GNUNET_STREAM_Socket *socket = cls;
1723   
1724   GNUNET_assert (NULL != socket->shutdown_handle);
1725   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1726   if (NULL != socket->shutdown_handle->completion_cb)
1727     socket->shutdown_handle->completion_cb
1728         (socket->shutdown_handle->completion_cls,
1729          socket->shutdown_handle->operation);
1730   GNUNET_free (socket->shutdown_handle);
1731   socket->shutdown_handle = NULL;
1732 }
1733
1734
1735 /**
1736  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1737  *
1738  * @param socket the socket
1739  * @param tunnel connection to the other end
1740  * @param sender who sent the message
1741  * @param message the actual message
1742  * @param atsi performance data for the connection
1743  * @param operation the close operation which is being ACK'ed
1744  * @return GNUNET_OK to keep the connection open,
1745  *         GNUNET_SYSERR to close it (signal serious error)
1746  */
1747 static int
1748 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1749                           struct GNUNET_MESH_Tunnel *tunnel,
1750                           const struct GNUNET_PeerIdentity *sender,
1751                           const struct GNUNET_STREAM_MessageHeader *message,
1752                           const struct GNUNET_ATS_Information *atsi,
1753                           int operation)
1754 {
1755   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1756
1757   shutdown_handle = socket->shutdown_handle;
1758   if (NULL == shutdown_handle)
1759   {
1760     /* This may happen when the shudown handle is cancelled */
1761     LOG (GNUNET_ERROR_TYPE_DEBUG,
1762          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1763          GNUNET_i2s (&socket->other_peer));
1764     return GNUNET_OK;
1765   }
1766   switch (operation)
1767   {
1768   case SHUT_RDWR:
1769     switch (socket->state)
1770     {
1771     case STATE_CLOSE_WAIT:
1772       if (SHUT_RDWR != shutdown_handle->operation)
1773       {
1774         LOG (GNUNET_ERROR_TYPE_DEBUG,
1775              "%s: Received CLOSE_ACK when shutdown handle is not for "
1776              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1777         return GNUNET_OK;
1778       }
1779       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1780            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1781       socket->state = STATE_CLOSED;
1782       break;
1783     default:
1784       LOG (GNUNET_ERROR_TYPE_DEBUG,
1785            "%s: Received CLOSE_ACK when it is not expected\n",
1786            GNUNET_i2s (&socket->other_peer));
1787       return GNUNET_OK;
1788     }
1789     break;
1790   case SHUT_RD:
1791     switch (socket->state)
1792     {
1793     case STATE_RECEIVE_CLOSE_WAIT:
1794       if (SHUT_RD != shutdown_handle->operation)
1795       {
1796         LOG (GNUNET_ERROR_TYPE_DEBUG,
1797              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1798              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1799         return GNUNET_OK;
1800       }
1801       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1802            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1803       socket->state = STATE_RECEIVE_CLOSED;
1804       break;
1805     default:
1806       LOG (GNUNET_ERROR_TYPE_DEBUG,
1807            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1808            GNUNET_i2s (&socket->other_peer));
1809       return GNUNET_OK;
1810     }
1811     break;
1812   case SHUT_WR:
1813     switch (socket->state)
1814     {
1815     case STATE_TRANSMIT_CLOSE_WAIT:
1816       if (SHUT_WR != shutdown_handle->operation)
1817       {
1818         LOG (GNUNET_ERROR_TYPE_DEBUG,
1819              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1820              "is not for SHUT_WR\n",
1821              GNUNET_i2s (&socket->other_peer));
1822         return GNUNET_OK;
1823       }
1824       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1825            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1826       socket->state = STATE_TRANSMIT_CLOSED;
1827       break;
1828     default:
1829       LOG (GNUNET_ERROR_TYPE_DEBUG,
1830            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1831            GNUNET_i2s (&socket->other_peer));          
1832       return GNUNET_OK;
1833     }
1834     break;
1835   default:
1836     GNUNET_assert (0);
1837   }
1838   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1839       (&call_cont_task, socket);
1840   if (GNUNET_SCHEDULER_NO_TASK
1841       != shutdown_handle->close_msg_retransmission_task_id)
1842   {
1843     GNUNET_SCHEDULER_cancel
1844       (shutdown_handle->close_msg_retransmission_task_id);
1845     shutdown_handle->close_msg_retransmission_task_id =
1846         GNUNET_SCHEDULER_NO_TASK;
1847   }
1848   return GNUNET_OK;
1849 }
1850
1851
1852 /**
1853  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1854  *
1855  * @param cls the socket (set from GNUNET_MESH_connect)
1856  * @param tunnel connection to the other end
1857  * @param tunnel_ctx this is NULL
1858  * @param sender who sent the message
1859  * @param message the actual message
1860  * @param atsi performance data for the connection
1861  * @return GNUNET_OK to keep the connection open,
1862  *         GNUNET_SYSERR to close it (signal serious error)
1863  */
1864 static int
1865 client_handle_transmit_close_ack (void *cls,
1866                                   struct GNUNET_MESH_Tunnel *tunnel,
1867                                   void **tunnel_ctx,
1868                                   const struct GNUNET_PeerIdentity *sender,
1869                                   const struct GNUNET_MessageHeader *message,
1870                                   const struct GNUNET_ATS_Information*atsi)
1871 {
1872   struct GNUNET_STREAM_Socket *socket = cls;
1873
1874   return handle_generic_close_ack (socket,
1875                                    tunnel,
1876                                    sender,
1877                                    (const struct GNUNET_STREAM_MessageHeader *)
1878                                    message,
1879                                    atsi,
1880                                    SHUT_WR);
1881 }
1882
1883
1884 /**
1885  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1886  *
1887  * @param socket the socket
1888  * @param tunnel connection to the other end
1889  * @param sender who sent the message
1890  * @param message the actual message
1891  * @param atsi performance data for the connection
1892  * @return GNUNET_OK to keep the connection open,
1893  *         GNUNET_SYSERR to close it (signal serious error)
1894  */
1895 static int
1896 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1897                       struct GNUNET_MESH_Tunnel *tunnel,
1898                       const struct GNUNET_PeerIdentity *sender,
1899                       const struct GNUNET_STREAM_MessageHeader *message,
1900                       const struct GNUNET_ATS_Information *atsi)
1901 {
1902   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1903
1904   switch (socket->state)
1905   {
1906   case STATE_INIT:
1907   case STATE_LISTEN:
1908   case STATE_HELLO_WAIT:
1909     LOG (GNUNET_ERROR_TYPE_DEBUG,
1910          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1911          GNUNET_i2s (&socket->other_peer));
1912     return GNUNET_OK;
1913   default:
1914     break;
1915   }  
1916   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1917        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1918   socket->transmit_closed = GNUNET_YES;
1919   if (GNUNET_YES == socket->receive_closed)
1920     socket->state = STATE_CLOSED;
1921   else
1922     socket->state = STATE_TRANSMIT_CLOSED;
1923   receive_close_ack =
1924     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1925   receive_close_ack->header.size =
1926     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1927   receive_close_ack->header.type =
1928     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1929   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1930   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1931      that that stream has been shutdown */
1932   if (NULL != socket->write_handle)
1933   {
1934     GNUNET_STREAM_CompletionContinuation wc;
1935     void *wc_cls;
1936
1937     wc = socket->write_handle->write_cont;
1938     wc_cls = socket->write_handle->write_cont_cls;
1939     GNUNET_STREAM_io_write_cancel (socket->write_handle);
1940     socket->write_handle = NULL;
1941     if (NULL != wc)
1942       wc (wc_cls,
1943           GNUNET_STREAM_SHUTDOWN, 0);
1944   }
1945   return GNUNET_OK;
1946 }
1947
1948
1949 /**
1950  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1951  *
1952  * @param cls the socket (set from GNUNET_MESH_connect)
1953  * @param tunnel connection to the other end
1954  * @param tunnel_ctx this is NULL
1955  * @param sender who sent the message
1956  * @param message the actual message
1957  * @param atsi performance data for the connection
1958  * @return GNUNET_OK to keep the connection open,
1959  *         GNUNET_SYSERR to close it (signal serious error)
1960  */
1961 static int
1962 client_handle_receive_close (void *cls,
1963                              struct GNUNET_MESH_Tunnel *tunnel,
1964                              void **tunnel_ctx,
1965                              const struct GNUNET_PeerIdentity *sender,
1966                              const struct GNUNET_MessageHeader *message,
1967                              const struct GNUNET_ATS_Information*atsi)
1968 {
1969   struct GNUNET_STREAM_Socket *socket = cls;
1970
1971   return
1972     handle_receive_close (socket,
1973                           tunnel,
1974                           sender,
1975                           (const struct GNUNET_STREAM_MessageHeader *) message,
1976                           atsi);
1977 }
1978
1979
1980 /**
1981  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1982  *
1983  * @param cls the socket (set from GNUNET_MESH_connect)
1984  * @param tunnel connection to the other end
1985  * @param tunnel_ctx this is NULL
1986  * @param sender who sent the message
1987  * @param message the actual message
1988  * @param atsi performance data for the connection
1989  * @return GNUNET_OK to keep the connection open,
1990  *         GNUNET_SYSERR to close it (signal serious error)
1991  */
1992 static int
1993 client_handle_receive_close_ack (void *cls,
1994                                  struct GNUNET_MESH_Tunnel *tunnel,
1995                                  void **tunnel_ctx,
1996                                  const struct GNUNET_PeerIdentity *sender,
1997                                  const struct GNUNET_MessageHeader *message,
1998                                  const struct GNUNET_ATS_Information*atsi)
1999 {
2000   struct GNUNET_STREAM_Socket *socket = cls;
2001
2002   return handle_generic_close_ack (socket,
2003                                    tunnel,
2004                                    sender,
2005                                    (const struct GNUNET_STREAM_MessageHeader *)
2006                                    message,
2007                                    atsi,
2008                                    SHUT_RD);
2009 }
2010
2011
2012 /**
2013  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2014  *
2015  * @param socket the socket
2016  * @param tunnel connection to the other end
2017  * @param sender who sent the message
2018  * @param message the actual message
2019  * @param atsi performance data for the connection
2020  * @return GNUNET_OK to keep the connection open,
2021  *         GNUNET_SYSERR to close it (signal serious error)
2022  */
2023 static int
2024 handle_close (struct GNUNET_STREAM_Socket *socket,
2025               struct GNUNET_MESH_Tunnel *tunnel,
2026               const struct GNUNET_PeerIdentity *sender,
2027               const struct GNUNET_STREAM_MessageHeader *message,
2028               const struct GNUNET_ATS_Information*atsi)
2029 {
2030   struct GNUNET_STREAM_MessageHeader *close_ack;
2031
2032   switch (socket->state)
2033   {
2034   case STATE_INIT:
2035   case STATE_LISTEN:
2036   case STATE_HELLO_WAIT:
2037     LOG (GNUNET_ERROR_TYPE_DEBUG,
2038          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2039          GNUNET_i2s (&socket->other_peer));
2040     return GNUNET_OK;
2041   default:
2042     break;
2043   }
2044   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2045        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2046   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2047   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2048   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2049   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2050   if (STATE_CLOSED == socket->state)
2051     return GNUNET_OK;
2052   socket->receive_closed = GNUNET_YES;
2053   socket->transmit_closed = GNUNET_YES;
2054   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2055   socket->receive_buffer = NULL;
2056   socket->receive_buffer_size = 0;  
2057   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
2058      that that stream has been shutdown */
2059   if (NULL != socket->write_handle)
2060   {
2061     GNUNET_STREAM_CompletionContinuation wc;
2062     void *wc_cls;
2063
2064     wc = socket->write_handle->write_cont;
2065     wc_cls = socket->write_handle->write_cont_cls;
2066     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2067     socket->write_handle = NULL;
2068     if (NULL != wc)
2069       wc (wc_cls,
2070           GNUNET_STREAM_SHUTDOWN, 0);
2071   }
2072   return GNUNET_OK;
2073 }
2074
2075
2076 /**
2077  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2078  *
2079  * @param cls the socket (set from GNUNET_MESH_connect)
2080  * @param tunnel connection to the other end
2081  * @param tunnel_ctx this is NULL
2082  * @param sender who sent the message
2083  * @param message the actual message
2084  * @param atsi performance data for the connection
2085  * @return GNUNET_OK to keep the connection open,
2086  *         GNUNET_SYSERR to close it (signal serious error)
2087  */
2088 static int
2089 client_handle_close (void *cls,
2090                      struct GNUNET_MESH_Tunnel *tunnel,
2091                      void **tunnel_ctx,
2092                      const struct GNUNET_PeerIdentity *sender,
2093                      const struct GNUNET_MessageHeader *message,
2094                      const struct GNUNET_ATS_Information*atsi)
2095 {
2096   struct GNUNET_STREAM_Socket *socket = cls;
2097
2098   return handle_close (socket,
2099                        tunnel,
2100                        sender,
2101                        (const struct GNUNET_STREAM_MessageHeader *) message,
2102                        atsi);
2103 }
2104
2105
2106 /**
2107  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2108  *
2109  * @param cls the socket (set from GNUNET_MESH_connect)
2110  * @param tunnel connection to the other end
2111  * @param tunnel_ctx this is NULL
2112  * @param sender who sent the message
2113  * @param message the actual message
2114  * @param atsi performance data for the connection
2115  * @return GNUNET_OK to keep the connection open,
2116  *         GNUNET_SYSERR to close it (signal serious error)
2117  */
2118 static int
2119 client_handle_close_ack (void *cls,
2120                          struct GNUNET_MESH_Tunnel *tunnel,
2121                          void **tunnel_ctx,
2122                          const struct GNUNET_PeerIdentity *sender,
2123                          const struct GNUNET_MessageHeader *message,
2124                          const struct GNUNET_ATS_Information *atsi)
2125 {
2126   struct GNUNET_STREAM_Socket *socket = cls;
2127
2128   return handle_generic_close_ack (socket,
2129                                    tunnel,
2130                                    sender,
2131                                    (const struct GNUNET_STREAM_MessageHeader *) 
2132                                    message,
2133                                    atsi,
2134                                    SHUT_RDWR);
2135 }
2136
2137 /*****************************/
2138 /* Server's Message Handlers */
2139 /*****************************/
2140
2141 /**
2142  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2143  *
2144  * @param cls the closure
2145  * @param tunnel connection to the other end
2146  * @param tunnel_ctx the socket
2147  * @param sender who sent the message
2148  * @param message the actual message
2149  * @param atsi performance data for the connection
2150  * @return GNUNET_OK to keep the connection open,
2151  *         GNUNET_SYSERR to close it (signal serious error)
2152  */
2153 static int
2154 server_handle_data (void *cls,
2155                     struct GNUNET_MESH_Tunnel *tunnel,
2156                     void **tunnel_ctx,
2157                     const struct GNUNET_PeerIdentity *sender,
2158                     const struct GNUNET_MessageHeader *message,
2159                     const struct GNUNET_ATS_Information*atsi)
2160 {
2161   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2162
2163   return handle_data (socket,
2164                       tunnel,
2165                       sender,
2166                       (const struct GNUNET_STREAM_DataMessage *)message,
2167                       atsi);
2168 }
2169
2170
2171 /**
2172  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2173  *
2174  * @param cls the closure
2175  * @param tunnel connection to the other end
2176  * @param tunnel_ctx the socket
2177  * @param sender who sent the message
2178  * @param message the actual message
2179  * @param atsi performance data for the connection
2180  * @return GNUNET_OK to keep the connection open,
2181  *         GNUNET_SYSERR to close it (signal serious error)
2182  */
2183 static int
2184 server_handle_hello (void *cls,
2185                      struct GNUNET_MESH_Tunnel *tunnel,
2186                      void **tunnel_ctx,
2187                      const struct GNUNET_PeerIdentity *sender,
2188                      const struct GNUNET_MessageHeader *message,
2189                      const struct GNUNET_ATS_Information*atsi)
2190 {
2191   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2192   struct GNUNET_STREAM_HelloAckMessage *reply;
2193
2194   if (0 != memcmp (sender,
2195                    &socket->other_peer,
2196                    sizeof (struct GNUNET_PeerIdentity)))
2197   {
2198     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2199                GNUNET_i2s (&socket->other_peer));
2200     return GNUNET_YES;
2201   }
2202   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2203   GNUNET_assert (socket->tunnel == tunnel);
2204   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2205              GNUNET_i2s (&socket->other_peer));
2206   switch (socket->state)
2207   {
2208   case STATE_INIT:
2209     reply = generate_hello_ack (socket, GNUNET_YES);
2210     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2211                    GNUNET_NO);
2212     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2213                    socket->control_retransmission_task_id);
2214     socket->control_retransmission_task_id =
2215       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2216                                     &control_retransmission_task, socket);
2217     break;
2218   case STATE_HELLO_WAIT:
2219     /* Perhaps our HELLO_ACK was lost */
2220     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2221                    socket->control_retransmission_task_id);
2222     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2223     socket->control_retransmission_task_id =
2224       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2225     break;
2226   default:
2227     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2228                GNUNET_i2s (&socket->other_peer), socket->state);
2229     /* FIXME: Send RESET? */
2230   }
2231   return GNUNET_OK;
2232 }
2233
2234
2235 /**
2236  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2237  *
2238  * @param cls the closure
2239  * @param tunnel connection to the other end
2240  * @param tunnel_ctx the socket
2241  * @param sender who sent the message
2242  * @param message the actual message
2243  * @param atsi performance data for the connection
2244  * @return GNUNET_OK to keep the connection open,
2245  *         GNUNET_SYSERR to close it (signal serious error)
2246  */
2247 static int
2248 server_handle_hello_ack (void *cls,
2249                          struct GNUNET_MESH_Tunnel *tunnel,
2250                          void **tunnel_ctx,
2251                          const struct GNUNET_PeerIdentity *sender,
2252                          const struct GNUNET_MessageHeader *message,
2253                          const struct GNUNET_ATS_Information*atsi)
2254 {
2255   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2256   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2257
2258   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2259                  ntohs (message->type));
2260   GNUNET_assert (socket->tunnel == tunnel);
2261   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2262   switch (socket->state)  
2263   {
2264   case STATE_HELLO_WAIT:
2265     LOG (GNUNET_ERROR_TYPE_DEBUG,
2266          "%s: Received HELLO_ACK from %s\n",
2267          GNUNET_i2s (&socket->other_peer),
2268          GNUNET_i2s (&socket->other_peer));
2269     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2270     LOG (GNUNET_ERROR_TYPE_DEBUG,
2271          "%s: Read sequence number %u\n",
2272          GNUNET_i2s (&socket->other_peer),
2273          (unsigned int) socket->read_sequence_number);
2274     socket->receiver_window_available = 
2275       ntohl (ack_message->receiver_window_size);
2276     set_state_established (NULL, socket);
2277     break;
2278   default:
2279     LOG (GNUNET_ERROR_TYPE_DEBUG,
2280          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2281   }
2282   return GNUNET_OK;
2283 }
2284
2285
2286 /**
2287  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2288  *
2289  * @param cls the closure
2290  * @param tunnel connection to the other end
2291  * @param tunnel_ctx the socket
2292  * @param sender who sent the message
2293  * @param message the actual message
2294  * @param atsi performance data for the connection
2295  * @return GNUNET_OK to keep the connection open,
2296  *         GNUNET_SYSERR to close it (signal serious error)
2297  */
2298 static int
2299 server_handle_reset (void *cls,
2300                      struct GNUNET_MESH_Tunnel *tunnel,
2301                      void **tunnel_ctx,
2302                      const struct GNUNET_PeerIdentity *sender,
2303                      const struct GNUNET_MessageHeader *message,
2304                      const struct GNUNET_ATS_Information*atsi)
2305 {
2306   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2307
2308   return GNUNET_OK;
2309 }
2310
2311
2312 /**
2313  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2314  *
2315  * @param cls the closure
2316  * @param tunnel connection to the other end
2317  * @param tunnel_ctx the socket
2318  * @param sender who sent the message
2319  * @param message the actual message
2320  * @param atsi performance data for the connection
2321  * @return GNUNET_OK to keep the connection open,
2322  *         GNUNET_SYSERR to close it (signal serious error)
2323  */
2324 static int
2325 server_handle_transmit_close (void *cls,
2326                               struct GNUNET_MESH_Tunnel *tunnel,
2327                               void **tunnel_ctx,
2328                               const struct GNUNET_PeerIdentity *sender,
2329                               const struct GNUNET_MessageHeader *message,
2330                               const struct GNUNET_ATS_Information*atsi)
2331 {
2332   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2333
2334   return handle_transmit_close (socket,
2335                                 tunnel,
2336                                 sender,
2337                                 (struct GNUNET_STREAM_MessageHeader *)message,
2338                                 atsi);
2339 }
2340
2341
2342 /**
2343  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2344  *
2345  * @param cls the closure
2346  * @param tunnel connection to the other end
2347  * @param tunnel_ctx the socket
2348  * @param sender who sent the message
2349  * @param message the actual message
2350  * @param atsi performance data for the connection
2351  * @return GNUNET_OK to keep the connection open,
2352  *         GNUNET_SYSERR to close it (signal serious error)
2353  */
2354 static int
2355 server_handle_transmit_close_ack (void *cls,
2356                                   struct GNUNET_MESH_Tunnel *tunnel,
2357                                   void **tunnel_ctx,
2358                                   const struct GNUNET_PeerIdentity *sender,
2359                                   const struct GNUNET_MessageHeader *message,
2360                                   const struct GNUNET_ATS_Information*atsi)
2361 {
2362   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2363
2364   return handle_generic_close_ack (socket,
2365                                    tunnel,
2366                                    sender,
2367                                    (const struct GNUNET_STREAM_MessageHeader *)
2368                                    message,
2369                                    atsi,
2370                                    SHUT_WR);
2371 }
2372
2373
2374 /**
2375  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2376  *
2377  * @param cls the closure
2378  * @param tunnel connection to the other end
2379  * @param tunnel_ctx the socket
2380  * @param sender who sent the message
2381  * @param message the actual message
2382  * @param atsi performance data for the connection
2383  * @return GNUNET_OK to keep the connection open,
2384  *         GNUNET_SYSERR to close it (signal serious error)
2385  */
2386 static int
2387 server_handle_receive_close (void *cls,
2388                              struct GNUNET_MESH_Tunnel *tunnel,
2389                              void **tunnel_ctx,
2390                              const struct GNUNET_PeerIdentity *sender,
2391                              const struct GNUNET_MessageHeader *message,
2392                              const struct GNUNET_ATS_Information*atsi)
2393 {
2394   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2395
2396   return
2397     handle_receive_close (socket,
2398                           tunnel,
2399                           sender,
2400                           (const struct GNUNET_STREAM_MessageHeader *) message,
2401                           atsi);
2402 }
2403
2404
2405 /**
2406  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2407  *
2408  * @param cls the closure
2409  * @param tunnel connection to the other end
2410  * @param tunnel_ctx the socket
2411  * @param sender who sent the message
2412  * @param message the actual message
2413  * @param atsi performance data for the connection
2414  * @return GNUNET_OK to keep the connection open,
2415  *         GNUNET_SYSERR to close it (signal serious error)
2416  */
2417 static int
2418 server_handle_receive_close_ack (void *cls,
2419                                  struct GNUNET_MESH_Tunnel *tunnel,
2420                                  void **tunnel_ctx,
2421                                  const struct GNUNET_PeerIdentity *sender,
2422                                  const struct GNUNET_MessageHeader *message,
2423                                  const struct GNUNET_ATS_Information*atsi)
2424 {
2425   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2426
2427   return handle_generic_close_ack (socket,
2428                                    tunnel,
2429                                    sender,
2430                                    (const struct GNUNET_STREAM_MessageHeader *)
2431                                    message,
2432                                    atsi,
2433                                    SHUT_RD);
2434 }
2435
2436
2437 /**
2438  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2439  *
2440  * @param cls the listen socket (from GNUNET_MESH_connect in
2441  *          GNUNET_STREAM_listen) 
2442  * @param tunnel connection to the other end
2443  * @param tunnel_ctx the socket
2444  * @param sender who sent the message
2445  * @param message the actual message
2446  * @param atsi performance data for the connection
2447  * @return GNUNET_OK to keep the connection open,
2448  *         GNUNET_SYSERR to close it (signal serious error)
2449  */
2450 static int
2451 server_handle_close (void *cls,
2452                      struct GNUNET_MESH_Tunnel *tunnel,
2453                      void **tunnel_ctx,
2454                      const struct GNUNET_PeerIdentity *sender,
2455                      const struct GNUNET_MessageHeader *message,
2456                      const struct GNUNET_ATS_Information*atsi)
2457 {
2458   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2459   
2460   return handle_close (socket,
2461                        tunnel,
2462                        sender,
2463                        (const struct GNUNET_STREAM_MessageHeader *) message,
2464                        atsi);
2465 }
2466
2467
2468 /**
2469  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2470  *
2471  * @param cls the closure
2472  * @param tunnel connection to the other end
2473  * @param tunnel_ctx the socket
2474  * @param sender who sent the message
2475  * @param message the actual message
2476  * @param atsi performance data for the connection
2477  * @return GNUNET_OK to keep the connection open,
2478  *         GNUNET_SYSERR to close it (signal serious error)
2479  */
2480 static int
2481 server_handle_close_ack (void *cls,
2482                          struct GNUNET_MESH_Tunnel *tunnel,
2483                          void **tunnel_ctx,
2484                          const struct GNUNET_PeerIdentity *sender,
2485                          const struct GNUNET_MessageHeader *message,
2486                          const struct GNUNET_ATS_Information*atsi)
2487 {
2488   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2489
2490   return handle_generic_close_ack (socket,
2491                                    tunnel,
2492                                    sender,
2493                                    (const struct GNUNET_STREAM_MessageHeader *) 
2494                                    message,
2495                                    atsi,
2496                                    SHUT_RDWR);
2497 }
2498
2499
2500 /**
2501  * Handler for DATA_ACK messages
2502  *
2503  * @param socket the socket through which the ack was received
2504  * @param tunnel connection to the other end
2505  * @param sender who sent the message
2506  * @param ack the acknowledgment message
2507  * @param atsi performance data for the connection
2508  * @return GNUNET_OK to keep the connection open,
2509  *         GNUNET_SYSERR to close it (signal serious error)
2510  */
2511 static int
2512 handle_ack (struct GNUNET_STREAM_Socket *socket,
2513             struct GNUNET_MESH_Tunnel *tunnel,
2514             const struct GNUNET_PeerIdentity *sender,
2515             const struct GNUNET_STREAM_AckMessage *ack,
2516             const struct GNUNET_ATS_Information*atsi)
2517 {
2518   struct GNUNET_STREAM_IOWriteHandle *write_handle;    
2519   unsigned int packet;
2520   int need_retransmission;
2521   uint32_t sequence_difference;
2522
2523   if (0 != memcmp (sender,
2524                    &socket->other_peer,
2525                    sizeof (struct GNUNET_PeerIdentity)))
2526   {
2527     LOG (GNUNET_ERROR_TYPE_DEBUG,
2528          "%s: Received ACK from non-confirming peer\n",
2529          GNUNET_i2s (&socket->other_peer));
2530     return GNUNET_YES;
2531   }
2532   switch (socket->state)
2533   {
2534   case (STATE_ESTABLISHED):
2535   case (STATE_RECEIVE_CLOSED):
2536   case (STATE_RECEIVE_CLOSE_WAIT):
2537     if (NULL == socket->write_handle)
2538     {
2539       LOG (GNUNET_ERROR_TYPE_DEBUG,
2540            "%s: Received DATA_ACK when write_handle is NULL\n",
2541            GNUNET_i2s (&socket->other_peer));
2542       return GNUNET_OK;
2543     }
2544     sequence_difference = 
2545         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2546     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2547     {
2548       LOG (GNUNET_ERROR_TYPE_DEBUG,
2549            "%s: Received DATA_ACK with unexpected base sequence number\n",
2550            GNUNET_i2s (&socket->other_peer));
2551       LOG (GNUNET_ERROR_TYPE_DEBUG,
2552            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2553            GNUNET_i2s (&socket->other_peer),
2554            socket->write_sequence_number,
2555            ntohl (ack->base_sequence_number));
2556       return GNUNET_OK;
2557     }
2558     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2559          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2560     /* Cancel the retransmission task */
2561     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2562     {
2563       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2564       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2565       socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
2566     }
2567     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2568     {
2569       if (NULL == socket->write_handle->messages[packet])
2570         break;
2571       /* BS: Base sequence from ack; PS: sequence num of current packet */
2572       sequence_difference = ntohl (ack->base_sequence_number)
2573         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2574       if ((0 == sequence_difference) ||
2575           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2576         break; /* The message in our handle is not yet received */
2577       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2578       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2579       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2580                             packet, GNUNET_YES);
2581     }
2582     /* Update the receive window remaining
2583        FIXME : Should update with the value from a data ack with greater
2584        sequence number */
2585     if (((ntohl (ack->base_sequence_number)
2586          - (socket->write_handle->max_ack_base_num))
2587           <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2588     {
2589       socket->write_handle->max_ack_base_num = ntohl (ack->base_sequence_number);
2590       socket->receiver_window_available = 
2591           ntohl (ack->receive_window_remaining);
2592     }
2593     else
2594       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2595                   "Ignoring to modify receive window available as base: %u, max_ack_base: %u\n",
2596                   ntohl (ack->base_sequence_number),
2597                    socket->write_handle->max_ack_base_num);
2598     if ((packet == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2599         || ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2600             && (NULL == socket->write_handle->messages[packet])))
2601       goto call_write_cont_cb;
2602     GNUNET_assert (ntohl
2603                    (socket->write_handle->messages[packet]->sequence_number)
2604                    == ntohl (ack->base_sequence_number));
2605     /* Check if we have received all acknowledgements */
2606     need_retransmission = GNUNET_NO;
2607     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2608     {
2609       if (NULL == socket->write_handle->messages[packet]) break;
2610       if (GNUNET_YES != ackbitmap_is_bit_set 
2611           (&socket->write_handle->ack_bitmap,packet))
2612       {
2613         need_retransmission = GNUNET_YES;
2614         break;
2615       }
2616     }
2617     if (GNUNET_YES == need_retransmission)
2618     {
2619       write_data (socket);
2620       return GNUNET_OK;
2621     }
2622
2623   call_write_cont_cb:
2624     /* Free the packets */
2625     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2626     {
2627       GNUNET_free_non_null (socket->write_handle->messages[packet]);
2628     }
2629     write_handle = socket->write_handle;
2630     socket->write_handle = NULL;
2631     if (NULL != write_handle->write_cont)
2632       write_handle->write_cont (write_handle->write_cont_cls,
2633                                 socket->status,
2634                                 write_handle->size);
2635     /* We are done with the write handle - Freeing it */
2636     GNUNET_free (write_handle);
2637     LOG (GNUNET_ERROR_TYPE_DEBUG,
2638          "%s: Write completion callback completed\n",
2639          GNUNET_i2s (&socket->other_peer));      
2640     break;
2641   default:
2642     break;
2643   }
2644   return GNUNET_OK;
2645 }
2646
2647
2648 /**
2649  * Handler for DATA_ACK messages
2650  *
2651  * @param cls the 'struct GNUNET_STREAM_Socket'
2652  * @param tunnel connection to the other end
2653  * @param tunnel_ctx unused
2654  * @param sender who sent the message
2655  * @param message the actual message
2656  * @param atsi performance data for the connection
2657  * @return GNUNET_OK to keep the connection open,
2658  *         GNUNET_SYSERR to close it (signal serious error)
2659  */
2660 static int
2661 client_handle_ack (void *cls,
2662                    struct GNUNET_MESH_Tunnel *tunnel,
2663                    void **tunnel_ctx,
2664                    const struct GNUNET_PeerIdentity *sender,
2665                    const struct GNUNET_MessageHeader *message,
2666                    const struct GNUNET_ATS_Information*atsi)
2667 {
2668   struct GNUNET_STREAM_Socket *socket = cls;
2669   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2670  
2671   return handle_ack (socket, tunnel, sender, ack, atsi);
2672 }
2673
2674
2675 /**
2676  * Handler for DATA_ACK messages
2677  *
2678  * @param cls the server's listen socket
2679  * @param tunnel connection to the other end
2680  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2681  * @param sender who sent the message
2682  * @param message the actual message
2683  * @param atsi performance data for the connection
2684  * @return GNUNET_OK to keep the connection open,
2685  *         GNUNET_SYSERR to close it (signal serious error)
2686  */
2687 static int
2688 server_handle_ack (void *cls,
2689                    struct GNUNET_MESH_Tunnel *tunnel,
2690                    void **tunnel_ctx,
2691                    const struct GNUNET_PeerIdentity *sender,
2692                    const struct GNUNET_MessageHeader *message,
2693                    const struct GNUNET_ATS_Information*atsi)
2694 {
2695   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2696   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2697  
2698   return handle_ack (socket, tunnel, sender, ack, atsi);
2699 }
2700
2701
2702 /**
2703  * For client message handlers, the stream socket is in the
2704  * closure argument.
2705  */
2706 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2707   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2708   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2709    sizeof (struct GNUNET_STREAM_AckMessage) },
2710   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2711    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2712   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2713    sizeof (struct GNUNET_STREAM_MessageHeader)},
2714   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2715    sizeof (struct GNUNET_STREAM_MessageHeader)},
2716   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2717    sizeof (struct GNUNET_STREAM_MessageHeader)},
2718   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2719    sizeof (struct GNUNET_STREAM_MessageHeader)},
2720   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2721    sizeof (struct GNUNET_STREAM_MessageHeader)},
2722   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2723    sizeof (struct GNUNET_STREAM_MessageHeader)},
2724   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2725    sizeof (struct GNUNET_STREAM_MessageHeader)},
2726   {NULL, 0, 0}
2727 };
2728
2729
2730 /**
2731  * For server message handlers, the stream socket is in the
2732  * tunnel context, and the listen socket in the closure argument.
2733  */
2734 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2735   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2736   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2737    sizeof (struct GNUNET_STREAM_AckMessage) },
2738   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2739    sizeof (struct GNUNET_STREAM_MessageHeader)},
2740   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2741    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2742   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2743    sizeof (struct GNUNET_STREAM_MessageHeader)},
2744   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2745    sizeof (struct GNUNET_STREAM_MessageHeader)},
2746   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2747    sizeof (struct GNUNET_STREAM_MessageHeader)},
2748   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2749    sizeof (struct GNUNET_STREAM_MessageHeader)},
2750   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2751    sizeof (struct GNUNET_STREAM_MessageHeader)},
2752   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2753    sizeof (struct GNUNET_STREAM_MessageHeader)},
2754   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2755    sizeof (struct GNUNET_STREAM_MessageHeader)},
2756   {NULL, 0, 0}
2757 };
2758
2759
2760 /**
2761  * Function called when our target peer is connected to our tunnel
2762  *
2763  * @param cls the socket for which this tunnel is created
2764  * @param peer the peer identity of the target
2765  * @param atsi performance data for the connection
2766  */
2767 static void
2768 mesh_peer_connect_callback (void *cls,
2769                             const struct GNUNET_PeerIdentity *peer,
2770                             const struct GNUNET_ATS_Information * atsi)
2771 {
2772   struct GNUNET_STREAM_Socket *socket = cls;
2773   struct GNUNET_STREAM_MessageHeader *message;
2774   
2775   if (0 != memcmp (peer,
2776                    &socket->other_peer,
2777                    sizeof (struct GNUNET_PeerIdentity)))
2778   {
2779     LOG (GNUNET_ERROR_TYPE_DEBUG,
2780          "%s: A peer which is not our target has connected to our tunnel\n",
2781          GNUNET_i2s(peer));
2782     return;
2783   }
2784   LOG (GNUNET_ERROR_TYPE_DEBUG,
2785        "%s: Target peer %s connected\n",
2786        GNUNET_i2s (&socket->other_peer),
2787        GNUNET_i2s (&socket->other_peer));
2788   /* Set state to INIT */
2789   socket->state = STATE_INIT;
2790   /* Send HELLO message */
2791   message = generate_hello ();
2792   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2793   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2794                  socket->control_retransmission_task_id);
2795   socket->control_retransmission_task_id =
2796     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2797                                   &control_retransmission_task, socket);
2798 }
2799
2800
2801 /**
2802  * Function called when our target peer is disconnected from our tunnel
2803  *
2804  * @param cls the socket associated which this tunnel
2805  * @param peer the peer identity of the target
2806  */
2807 static void
2808 mesh_peer_disconnect_callback (void *cls,
2809                                const struct GNUNET_PeerIdentity *peer)
2810 {
2811   struct GNUNET_STREAM_Socket *socket=cls;
2812   
2813   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2814   LOG (GNUNET_ERROR_TYPE_DEBUG,
2815        "%s: Other peer %s disconnected \n",
2816        GNUNET_i2s (&socket->other_peer),
2817        GNUNET_i2s (&socket->other_peer));
2818 }
2819
2820
2821 /**
2822  * Method called whenever a peer creates a tunnel to us
2823  *
2824  * @param cls closure
2825  * @param tunnel new handle to the tunnel
2826  * @param initiator peer that started the tunnel
2827  * @param atsi performance information for the tunnel
2828  * @return initial tunnel context for the tunnel
2829  *         (can be NULL -- that's not an error)
2830  */
2831 static void *
2832 new_tunnel_notify (void *cls,
2833                    struct GNUNET_MESH_Tunnel *tunnel,
2834                    const struct GNUNET_PeerIdentity *initiator,
2835                    const struct GNUNET_ATS_Information *atsi)
2836 {
2837   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2838   struct GNUNET_STREAM_Socket *socket;
2839
2840   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2841      from the same peer again until the socket is closed */
2842
2843   if (GNUNET_NO == lsocket->listening)
2844   {
2845     GNUNET_MESH_tunnel_destroy (tunnel);
2846     return NULL;
2847   }
2848   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2849   socket->other_peer = *initiator;
2850   socket->tunnel = tunnel;
2851   socket->state = STATE_INIT;
2852   socket->lsocket = lsocket;
2853   socket->stat_handle = lsocket->stat_handle;
2854   socket->retransmit_timeout = lsocket->retransmit_timeout;
2855   socket->testing_active = lsocket->testing_active;
2856   socket->testing_set_write_sequence_number_value =
2857       lsocket->testing_set_write_sequence_number_value;
2858   socket->max_payload_size = lsocket->max_payload_size;
2859   LOG (GNUNET_ERROR_TYPE_DEBUG,
2860        "%s: Peer %s initiated tunnel to us\n", 
2861        GNUNET_i2s (&socket->other_peer),
2862        GNUNET_i2s (&socket->other_peer));
2863   if (NULL != socket->stat_handle)
2864   {
2865     GNUNET_STATISTICS_update (socket->stat_handle,
2866                               "total inbound connections received",
2867                               1, GNUNET_NO);
2868     GNUNET_STATISTICS_update (socket->stat_handle,
2869                               "inbound connections", 1, GNUNET_NO);
2870   }
2871   
2872   return socket;
2873 }
2874
2875
2876 /**
2877  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2878  * any associated state.  This function is NOT called if the client has
2879  * explicitly asked for the tunnel to be destroyed using
2880  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2881  * the tunnel.
2882  *
2883  * @param cls closure (set from GNUNET_MESH_connect)
2884  * @param tunnel connection to the other end (henceforth invalid)
2885  * @param tunnel_ctx place where local state associated
2886  *                   with the tunnel is stored
2887  */
2888 static void 
2889 tunnel_cleaner (void *cls,
2890                 const struct GNUNET_MESH_Tunnel *tunnel,
2891                 void *tunnel_ctx)
2892 {
2893   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2894   struct MessageQueue *head;
2895
2896   GNUNET_assert (tunnel == socket->tunnel);
2897   GNUNET_break_op(0);
2898   LOG (GNUNET_ERROR_TYPE_DEBUG,
2899        "%s: Peer %s has terminated connection abruptly\n",
2900        GNUNET_i2s (&socket->other_peer),
2901        GNUNET_i2s (&socket->other_peer));
2902   if (NULL != socket->stat_handle)
2903   {
2904     GNUNET_STATISTICS_update (socket->stat_handle,
2905                               "connections terminated abruptly", 1, GNUNET_NO);
2906     GNUNET_STATISTICS_update (socket->stat_handle,
2907                               "inbound connections", -1, GNUNET_NO);
2908   }
2909   socket->status = GNUNET_STREAM_SHUTDOWN;
2910   /* Clear Transmit handles */
2911   if (NULL != socket->transmit_handle)
2912   {
2913     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2914     socket->transmit_handle = NULL;
2915   }
2916   /* Stop Tasks using socket->tunnel */
2917   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2918   {
2919     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2920     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2921   }
2922   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2923   {
2924     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2925     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2926   }  
2927   /* Terminate the control retransmission tasks */
2928   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2929   {
2930     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2931     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2932   }
2933   /* Clear Transmit handles */
2934   if (NULL != socket->transmit_handle)
2935   {
2936     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2937     socket->transmit_handle = NULL;
2938   }
2939   /* Clear existing message queue */
2940   while (NULL != (head = socket->queue_head)) {
2941     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2942                                  socket->queue_tail,
2943                                  head);
2944     GNUNET_free (head->message);
2945     GNUNET_free (head);
2946   }
2947   socket->tunnel = NULL;
2948 }
2949
2950
2951 /**
2952  * Callback to signal timeout on lockmanager lock acquire
2953  *
2954  * @param cls the ListenSocket
2955  * @param tc the scheduler task context
2956  */
2957 static void
2958 lockmanager_acquire_timeout (void *cls, 
2959                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2960 {
2961   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2962   GNUNET_STREAM_ListenCallback listen_cb;
2963   void *listen_cb_cls;
2964
2965   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2966   listen_cb = lsocket->listen_cb;
2967   listen_cb_cls = lsocket->listen_cb_cls;
2968   if (NULL != listen_cb)
2969     listen_cb (listen_cb_cls, NULL, NULL);
2970 }
2971
2972
2973 /**
2974  * Callback to notify us on the status changes on app_port lock
2975  *
2976  * @param cls the ListenSocket
2977  * @param domain the domain name of the lock
2978  * @param lock the app_port
2979  * @param status the current status of the lock
2980  */
2981 static void
2982 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2983                        enum GNUNET_LOCKMANAGER_Status status)
2984 {
2985   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2986
2987   GNUNET_assert (lock == (uint32_t) lsocket->port);
2988   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2989   {
2990     lsocket->listening = GNUNET_YES;
2991     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2992     {
2993       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2994       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2995     }
2996     if (NULL == lsocket->mesh)
2997     {
2998       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2999
3000       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
3001                                            lsocket, /* Closure */
3002                                            &new_tunnel_notify,
3003                                            &tunnel_cleaner,
3004                                            server_message_handlers,
3005                                            ports);
3006       GNUNET_assert (NULL != lsocket->mesh);
3007       if (NULL != lsocket->listen_ok_cb)
3008       {
3009         (void) lsocket->listen_ok_cb ();
3010       }
3011     }
3012   }
3013   if (GNUNET_LOCKMANAGER_RELEASE == status)
3014     lsocket->listening = GNUNET_NO;
3015 }
3016
3017
3018 /*****************/
3019 /* API functions */
3020 /*****************/
3021
3022
3023 /**
3024  * Tries to open a stream to the target peer
3025  *
3026  * @param cfg configuration to use
3027  * @param target the target peer to which the stream has to be opened
3028  * @param app_port the application port number which uniquely identifies this
3029  *            stream
3030  * @param open_cb this function will be called after stream has be established;
3031  *          cannot be NULL
3032  * @param open_cb_cls the closure for open_cb
3033  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3034  * @return if successful it returns the stream socket; NULL if stream cannot be
3035  *         opened 
3036  */
3037 struct GNUNET_STREAM_Socket *
3038 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3039                     const struct GNUNET_PeerIdentity *target,
3040                     GNUNET_MESH_ApplicationType app_port,
3041                     GNUNET_STREAM_OpenCallback open_cb,
3042                     void *open_cb_cls,
3043                     ...)
3044 {
3045   struct GNUNET_STREAM_Socket *socket;
3046   enum GNUNET_STREAM_Option option;
3047   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3048   va_list vargs;
3049   uint16_t payload_size;
3050
3051   LOG (GNUNET_ERROR_TYPE_DEBUG,
3052        "%s\n", __func__);
3053   GNUNET_assert (NULL != open_cb);
3054   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3055   socket->other_peer = *target;
3056   socket->open_cb = open_cb;
3057   socket->open_cls = open_cb_cls;
3058   /* Set defaults */
3059   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3060   socket->testing_active = GNUNET_NO;
3061   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3062   va_start (vargs, open_cb_cls); /* Parse variable args */
3063   do {
3064     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3065     switch (option)
3066     {
3067     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3068       /* Expect struct GNUNET_TIME_Relative */
3069       socket->retransmit_timeout = va_arg (vargs,
3070                                            struct GNUNET_TIME_Relative);
3071       break;
3072     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3073       socket->testing_active = GNUNET_YES;
3074       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3075                                                                 uint32_t);
3076       break;
3077     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3078       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3079       break;
3080     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3081       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3082       break;
3083     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3084       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3085       GNUNET_assert (0 != payload_size);
3086       if (payload_size < socket->max_payload_size)
3087         socket->max_payload_size = payload_size;
3088       break;
3089     case GNUNET_STREAM_OPTION_END:
3090       break;
3091     }
3092   } while (GNUNET_STREAM_OPTION_END != option);
3093   va_end (vargs);               /* End of variable args parsing */
3094   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3095                                       socket, /* cls */
3096                                       NULL, /* No inbound tunnel handler */
3097                                       NULL, /* No in-tunnel cleaner */
3098                                       client_message_handlers,
3099                                       ports); /* We don't get inbound tunnels */
3100   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3101   {
3102     GNUNET_free (socket);
3103     return NULL;
3104   }
3105   /* Now create the mesh tunnel to target */
3106   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3107   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3108                                               NULL, /* Tunnel context */
3109                                               &mesh_peer_connect_callback,
3110                                               &mesh_peer_disconnect_callback,
3111                                               socket);
3112   GNUNET_assert (NULL != socket->tunnel);
3113   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3114                                         &socket->other_peer);
3115   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3116   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3117   return socket;
3118 }
3119
3120
3121 /**
3122  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3123  *
3124  * @param socket the stream socket
3125  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3126  * @param completion_cb the callback that will be called upon successful
3127  *          shutdown of given operation
3128  * @param completion_cls the closure for the completion callback
3129  * @return the shutdown handle
3130  */
3131 struct GNUNET_STREAM_ShutdownHandle *
3132 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3133                         int operation,
3134                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3135                         void *completion_cls)
3136 {
3137   struct GNUNET_STREAM_ShutdownHandle *handle;
3138   struct GNUNET_STREAM_MessageHeader *msg;
3139   
3140   GNUNET_assert (NULL == socket->shutdown_handle);
3141   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3142   handle->socket = socket;
3143   handle->completion_cb = completion_cb;
3144   handle->completion_cls = completion_cls;
3145   socket->shutdown_handle = handle;
3146   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3147        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3148        || ((GNUNET_YES == socket->transmit_closed) 
3149            && (GNUNET_YES == socket->receive_closed)
3150            && (SHUT_RDWR == operation)) )
3151   {
3152     handle->operation = operation;
3153     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3154                                                           socket);
3155     return handle;
3156   }
3157   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3158   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3159   switch (operation)
3160   {
3161   case SHUT_RD:
3162     handle->operation = SHUT_RD;
3163     if (NULL != socket->read_handle)
3164       LOG (GNUNET_ERROR_TYPE_WARNING,
3165            "Existing read handle should be cancelled before shutting"
3166            " down reading\n");
3167     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3168     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3169                    GNUNET_NO);
3170     socket->receive_closed = GNUNET_YES;
3171     break;
3172   case SHUT_WR:
3173     handle->operation = SHUT_WR;
3174     if (NULL != socket->write_handle)
3175       LOG (GNUNET_ERROR_TYPE_WARNING,
3176            "Existing write handle should be cancelled before shutting"
3177            " down writing\n");
3178     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3179     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3180                    GNUNET_NO);
3181     socket->transmit_closed = GNUNET_YES;
3182     break;
3183   case SHUT_RDWR:
3184     handle->operation = SHUT_RDWR;
3185     if (NULL != socket->write_handle)
3186       LOG (GNUNET_ERROR_TYPE_WARNING,
3187            "Existing write handle should be cancelled before shutting"
3188            " down writing\n");
3189     if (NULL != socket->read_handle)
3190       LOG (GNUNET_ERROR_TYPE_WARNING,
3191            "Existing read handle should be cancelled before shutting"
3192            " down reading\n");
3193     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3194     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3195     socket->transmit_closed = GNUNET_YES;
3196     socket->receive_closed = GNUNET_YES;
3197     break;
3198   default:
3199     LOG (GNUNET_ERROR_TYPE_WARNING,
3200          "GNUNET_STREAM_shutdown called with invalid value for "
3201          "parameter operation -- Ignoring\n");
3202     GNUNET_free (msg);
3203     GNUNET_free (handle);
3204     return NULL;
3205   }
3206   handle->close_msg_retransmission_task_id =
3207     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3208                                   &close_msg_retransmission_task,
3209                                   handle);
3210   return handle;
3211 }
3212
3213
3214 /**
3215  * Cancels a pending shutdown. Note that the shutdown messages may already be
3216  * sent and the stream is shutdown already for the operation given to
3217  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3218  * shutdown messages and frees the shutdown handle.
3219  *
3220  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3221  */
3222 void
3223 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3224 {
3225   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3226     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3227   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3228     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3229   handle->socket->shutdown_handle = NULL;
3230   GNUNET_free (handle);
3231 }
3232
3233
3234 /**
3235  * Closes the stream
3236  *
3237  * @param socket the stream socket
3238  */
3239 void
3240 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3241 {
3242   struct MessageQueue *head;
3243
3244   if (NULL != socket->read_handle)
3245   {
3246     LOG (GNUNET_ERROR_TYPE_WARNING,
3247          "Closing STREAM socket when a read handle is pending\n");
3248     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3249   }
3250   if (NULL != socket->write_handle)
3251   {
3252     LOG (GNUNET_ERROR_TYPE_WARNING,
3253          "Closing STREAM socket when a write handle is pending\n");
3254     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3255     //socket->write_handle = NULL;
3256   }
3257   /* Terminate the ack'ing task if they are still present */
3258   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3259   {
3260     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3261     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3262   }
3263   /* Terminate the control retransmission tasks */
3264   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3265   {
3266     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3267   }
3268   /* Clear Transmit handles */
3269   if (NULL != socket->transmit_handle)
3270   {
3271     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3272     socket->transmit_handle = NULL;
3273   }
3274   /* Clear existing message queue */
3275   while (NULL != (head = socket->queue_head)) {
3276     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3277                                  socket->queue_tail,
3278                                  head);
3279     GNUNET_free (head->message);
3280     GNUNET_free (head);
3281   }
3282   /* Close associated tunnel */
3283   if (NULL != socket->tunnel)
3284   {
3285     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3286     socket->tunnel = NULL;
3287   }
3288   /* Close mesh connection */
3289   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3290   {
3291     GNUNET_MESH_disconnect (socket->mesh);
3292     socket->mesh = NULL;
3293   }
3294   /* Close statistics connection */
3295   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3296     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3297   /* Release receive buffer */
3298   if (NULL != socket->receive_buffer)
3299   {
3300     GNUNET_free (socket->receive_buffer);
3301   }
3302   GNUNET_free (socket);
3303 }
3304
3305
3306 /**
3307  * Listens for stream connections for a specific application ports
3308  *
3309  * @param cfg the configuration to use
3310  * @param app_port the application port for which new streams will be accepted
3311  * @param listen_cb this function will be called when a peer tries to establish
3312  *            a stream with us
3313  * @param listen_cb_cls closure for listen_cb
3314  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3315  * @return listen socket, NULL for any error
3316  */
3317 struct GNUNET_STREAM_ListenSocket *
3318 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3319                       GNUNET_MESH_ApplicationType app_port,
3320                       GNUNET_STREAM_ListenCallback listen_cb,
3321                       void *listen_cb_cls,
3322                       ...)
3323 {
3324   struct GNUNET_STREAM_ListenSocket *lsocket;
3325   struct GNUNET_TIME_Relative listen_timeout;
3326   enum GNUNET_STREAM_Option option;
3327   va_list vargs;
3328   uint16_t payload_size;
3329
3330   GNUNET_assert (NULL != listen_cb);
3331   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3332   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3333   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3334   if (NULL == lsocket->lockmanager)
3335   {
3336     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3337     GNUNET_free (lsocket);
3338     return NULL;
3339   }
3340   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3341   /* Set defaults */
3342   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3343   lsocket->testing_active = GNUNET_NO;
3344   lsocket->listen_ok_cb = NULL;
3345   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3346   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3347   va_start (vargs, listen_cb_cls);
3348   do {
3349     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3350     switch (option)
3351     {
3352     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3353       lsocket->retransmit_timeout = va_arg (vargs,
3354                                             struct GNUNET_TIME_Relative);
3355       break;
3356     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3357       lsocket->testing_active = GNUNET_YES;
3358       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3359                                                                  uint32_t);
3360       break;
3361     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3362       listen_timeout = GNUNET_TIME_relative_multiply
3363         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3364       break;
3365     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3366       lsocket->listen_ok_cb = va_arg (vargs,
3367                                       GNUNET_STREAM_ListenSuccessCallback);
3368       break;
3369     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3370       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3371       GNUNET_assert (0 != payload_size);
3372       if (payload_size < lsocket->max_payload_size)
3373         lsocket->max_payload_size = payload_size;
3374       break;
3375     case GNUNET_STREAM_OPTION_END:
3376       break;
3377     }
3378   } while (GNUNET_STREAM_OPTION_END != option);
3379   va_end (vargs);
3380   lsocket->port = app_port;
3381   lsocket->listen_cb = listen_cb;
3382   lsocket->listen_cb_cls = listen_cb_cls;
3383   lsocket->locking_request = 
3384     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3385                                      (uint32_t) lsocket->port,
3386                                      &lock_status_change_cb, lsocket);
3387   lsocket->lockmanager_acquire_timeout_task =
3388     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3389                                   &lockmanager_acquire_timeout, lsocket);
3390   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3391                                                    lsocket->cfg);
3392   return lsocket;
3393 }
3394
3395
3396 /**
3397  * Closes the listen socket
3398  *
3399  * @param lsocket the listen socket
3400  */
3401 void
3402 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3403 {
3404   /* Close MESH connection */
3405   if (NULL != lsocket->mesh)
3406     GNUNET_MESH_disconnect (lsocket->mesh);
3407   if (NULL != lsocket->stat_handle)
3408     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3409   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3410   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3411     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3412   if (NULL != lsocket->locking_request)
3413     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3414   if (NULL != lsocket->lockmanager)
3415     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3416   GNUNET_free (lsocket);
3417 }
3418
3419
3420 /**
3421  * Tries to write the given data to the stream. The maximum size of data that
3422  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3423  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3424  * violation, however only the said number of maximum bytes will be written.
3425  *
3426  * @param socket the socket representing a stream
3427  * @param data the data buffer from where the data is written into the stream
3428  * @param size the number of bytes to be written from the data buffer
3429  * @param timeout the timeout period
3430  * @param write_cont the function to call upon writing some bytes into the
3431  *          stream 
3432  * @param write_cont_cls the closure
3433  *
3434  * @return handle to cancel the operation; if a previous write is pending or
3435  *           the stream has been shutdown for this operation then write_cont is
3436  *           immediately called and NULL is returned.
3437  */
3438 struct GNUNET_STREAM_IOWriteHandle *
3439 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3440                      const void *data,
3441                      size_t size,
3442                      struct GNUNET_TIME_Relative timeout,
3443                      GNUNET_STREAM_CompletionContinuation write_cont,
3444                      void *write_cont_cls)
3445 {
3446   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3447   struct GNUNET_STREAM_DataMessage *data_msg;
3448   const void *sweep;
3449   struct GNUNET_TIME_Relative ack_deadline;
3450   unsigned int num_needed_packets;
3451   unsigned int packet;
3452   uint32_t packet_size;
3453   uint32_t payload_size;
3454   uint16_t max_data_packet_size;
3455
3456   LOG (GNUNET_ERROR_TYPE_DEBUG,
3457        "%s\n", __func__);
3458   if (NULL != socket->write_handle)
3459   {
3460     GNUNET_break (0);
3461     return NULL;
3462   }
3463   switch (socket->state)
3464   {
3465   case STATE_TRANSMIT_CLOSED:
3466   case STATE_TRANSMIT_CLOSE_WAIT:
3467   case STATE_CLOSED:
3468   case STATE_CLOSE_WAIT:
3469     if (NULL != write_cont)
3470       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3471     LOG (GNUNET_ERROR_TYPE_DEBUG,
3472          "%s() END\n", __func__);
3473     return NULL;
3474   case STATE_INIT:
3475   case STATE_LISTEN:
3476   case STATE_HELLO_WAIT:
3477     if (NULL != write_cont)
3478       /* FIXME: GNUNET_STREAM_SYSERR?? */
3479       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3480     LOG (GNUNET_ERROR_TYPE_DEBUG,
3481          "%s() END\n", __func__);
3482     return NULL;
3483   case STATE_ESTABLISHED:
3484   case STATE_RECEIVE_CLOSED:
3485   case STATE_RECEIVE_CLOSE_WAIT:
3486     break;
3487   }
3488   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3489     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3490   num_needed_packets =
3491       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3492   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3493   io_handle->socket = socket;
3494   io_handle->write_cont = write_cont;
3495   io_handle->write_cont_cls = write_cont_cls;
3496   io_handle->size = size;
3497   io_handle->packets_sent = 0;
3498   sweep = data;
3499   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3500      determined from RTT */
3501   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3502   /* Divide the given buffer into packets for sending */
3503   max_data_packet_size =
3504       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3505   io_handle->max_ack_base_num = socket->write_sequence_number;
3506   for (packet=0; packet < num_needed_packets; packet++)
3507   {
3508     if ((packet + 1) * socket->max_payload_size < size) 
3509     {
3510       payload_size = socket->max_payload_size;
3511       packet_size = max_data_packet_size;
3512     }
3513     else 
3514     {
3515       payload_size = size - packet * socket->max_payload_size;
3516       packet_size = 
3517           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3518     }
3519     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3520     io_handle->messages[packet]->header.header.size = htons (packet_size);
3521     io_handle->messages[packet]->header.header.type =
3522       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3523     io_handle->messages[packet]->sequence_number =
3524       htonl (socket->write_sequence_number++);
3525     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3526     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3527        determined from RTT */
3528     io_handle->messages[packet]->ack_deadline =
3529       GNUNET_TIME_relative_hton (ack_deadline);
3530     data_msg = io_handle->messages[packet];
3531     /* Copy data from given buffer to the packet */
3532     memcpy (&data_msg[1], sweep, payload_size);
3533     sweep += payload_size;
3534     socket->write_offset += payload_size;
3535   }
3536   /* ack the last data message. FIXME: remove when we figure out how to do this
3537      using RTT */
3538   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3539       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3540   socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
3541   socket->write_handle = io_handle;
3542   write_data (socket);
3543   LOG (GNUNET_ERROR_TYPE_DEBUG,
3544        "%s() END\n", __func__);
3545   return io_handle;
3546 }
3547
3548
3549 /**
3550  * Tries to read data from the stream.
3551  *
3552  * @param socket the socket representing a stream
3553  * @param timeout the timeout period
3554  * @param proc function to call with data (once only)
3555  * @param proc_cls the closure for proc
3556  *
3557  * @return handle to cancel the operation; NULL is returned if: the stream has
3558  *           been shutdown for this type of opeartion (the DataProcessor is
3559  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3560  *           read handle is present (only one read handle per socket is present
3561  *           at any time)
3562  */
3563 struct GNUNET_STREAM_IOReadHandle *
3564 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3565                     struct GNUNET_TIME_Relative timeout,
3566                     GNUNET_STREAM_DataProcessor proc,
3567                     void *proc_cls)
3568 {
3569   struct GNUNET_STREAM_IOReadHandle *read_handle;
3570   
3571   LOG (GNUNET_ERROR_TYPE_DEBUG,
3572        "%s: %s()\n", 
3573        GNUNET_i2s (&socket->other_peer),
3574        __func__);
3575   /* Return NULL if there is already a read handle; the user has to cancel that
3576      first before continuing or has to wait until it is completed */
3577   if (NULL != socket->read_handle) 
3578     return NULL;
3579   GNUNET_assert (NULL != proc);
3580   switch (socket->state)
3581   {
3582   case STATE_RECEIVE_CLOSED:
3583   case STATE_RECEIVE_CLOSE_WAIT:
3584   case STATE_CLOSED:
3585   case STATE_CLOSE_WAIT:
3586     LOG (GNUNET_ERROR_TYPE_DEBUG,
3587          "%s: %s() END\n",
3588          GNUNET_i2s (&socket->other_peer),
3589          __func__);
3590     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3591     return NULL;
3592   default:
3593     break;
3594   }
3595   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3596   read_handle->proc = proc;
3597   read_handle->proc_cls = proc_cls;
3598   read_handle->socket = socket;
3599   socket->read_handle = read_handle;
3600   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3601                                           0))
3602     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3603                                                           socket);   
3604   read_handle->read_io_timeout_task_id =
3605       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3606   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3607        GNUNET_i2s (&socket->other_peer), __func__);
3608   return read_handle;
3609 }
3610
3611
3612 /**
3613  * Cancel pending write operation.
3614  *
3615  * @param ioh handle to operation to cancel
3616  */
3617 void
3618 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3619 {
3620   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3621   unsigned int packet;
3622
3623   GNUNET_assert (NULL != socket->write_handle);
3624   GNUNET_assert (socket->write_handle == ioh);
3625   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3626   {
3627     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3628     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3629   }
3630   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3631   {
3632     if (NULL == ioh->messages[packet]) break;
3633     GNUNET_free (ioh->messages[packet]);
3634   }      
3635   GNUNET_free (socket->write_handle);
3636   socket->write_handle = NULL;
3637 }
3638
3639
3640 /**
3641  * Cancel pending read operation.
3642  *
3643  * @param ioh handle to operation to cancel
3644  */
3645 void
3646 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3647 {
3648   struct GNUNET_STREAM_Socket *socket;
3649   
3650   socket = ioh->socket;
3651   GNUNET_assert (NULL != socket->read_handle);
3652   GNUNET_assert (ioh == socket->read_handle);
3653   /* Read io time task should be there; if it is already executed then this
3654   read handle is not valid; However upon scheduler shutdown the read io task
3655   may be executed before */
3656   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3657     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3658   /* reading task may be present; if so we have to stop it */
3659   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3660     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3661   GNUNET_free (ioh);
3662   socket->read_handle = NULL;
3663 }
3664
3665 /* end of stream_api.c */