2180f919ac1b9e34c8af2642494d517f0c4b599c
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  **/
31
32 /**
33  * @file stream/stream_api.c
34  * @brief Implementation of the stream library
35  * @author Sree Harsha Totakura
36  */
37
38
39 #include "platform.h"
40 #include "gnunet_common.h"
41 #include "gnunet_crypto_lib.h"
42 #include "gnunet_stream_lib.h"
43 #include "gnunet_testing_lib.h"
44 #include "stream_protocol.h"
45
46
47 /**
48  * The maximum packet size of a stream packet
49  */
50 #define MAX_PACKET_SIZE 64000
51
52 /**
53  * Receive buffer
54  */
55 #define RECEIVE_BUFFER_SIZE 4096000
56
57 /**
58  * The maximum payload a data message packet can carry
59  */
60 static size_t max_payload_size = 
61   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
62
63 /**
64  * states in the Protocol
65  */
66 enum State
67   {
68     /**
69      * Client initialization state
70      */
71     STATE_INIT,
72
73     /**
74      * Listener initialization state 
75      */
76     STATE_LISTEN,
77
78     /**
79      * Pre-connection establishment state
80      */
81     STATE_HELLO_WAIT,
82
83     /**
84      * State where a connection has been established
85      */
86     STATE_ESTABLISHED,
87
88     /**
89      * State where the socket is closed on our side and waiting to be ACK'ed
90      */
91     STATE_RECEIVE_CLOSE_WAIT,
92
93     /**
94      * State where the socket is closed for reading
95      */
96     STATE_RECEIVE_CLOSED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_TRANSMIT_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for writing
105      */
106     STATE_TRANSMIT_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed
115      */
116     STATE_CLOSED 
117   };
118
119
120 /**
121  * Functions of this type are called when a message is written
122  *
123  * @param cls the closure from queue_message
124  * @param socket the socket the written message was bound to
125  */
126 typedef void (*SendFinishCallback) (void *cls,
127                                     struct GNUNET_STREAM_Socket *socket);
128
129
130 /**
131  * The send message queue
132  */
133 struct MessageQueue
134 {
135   /**
136    * The message
137    */
138   struct GNUNET_STREAM_MessageHeader *message;
139
140   /**
141    * Callback to be called when the message is sent
142    */
143   SendFinishCallback finish_cb;
144
145   /**
146    * The closure for finish_cb
147    */
148   void *finish_cb_cls;
149
150   /**
151    * The next message in queue. Should be NULL in the last message
152    */
153   struct MessageQueue *next;
154
155   /**
156    * The next message in queue. Should be NULL in the first message
157    */
158   struct MessageQueue *prev;
159 };
160
161
162 /**
163  * The STREAM Socket Handler
164  */
165 struct GNUNET_STREAM_Socket
166 {
167   /**
168    * Retransmission timeout
169    */
170   struct GNUNET_TIME_Relative retransmit_timeout;
171
172   /**
173    * The Acknowledgement Bitmap
174    */
175   GNUNET_STREAM_AckBitmap ack_bitmap;
176
177   /**
178    * Time when the Acknowledgement was queued
179    */
180   struct GNUNET_TIME_Absolute ack_time_registered;
181
182   /**
183    * Queued Acknowledgement deadline
184    */
185   struct GNUNET_TIME_Relative ack_time_deadline;
186
187   /**
188    * The mesh handle
189    */
190   struct GNUNET_MESH_Handle *mesh;
191
192   /**
193    * The mesh tunnel handle
194    */
195   struct GNUNET_MESH_Tunnel *tunnel;
196
197   /**
198    * Stream open closure
199    */
200   void *open_cls;
201
202   /**
203    * Stream open callback
204    */
205   GNUNET_STREAM_OpenCallback open_cb;
206
207   /**
208    * The current transmit handle (if a pending transmit request exists)
209    */
210   struct GNUNET_MESH_TransmitHandle *transmit_handle;
211
212   /**
213    * The current message associated with the transmit handle
214    */
215   struct MessageQueue *queue_head;
216
217   /**
218    * The queue tail, should always point to the last message in queue
219    */
220   struct MessageQueue *queue_tail;
221
222   /**
223    * The write IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_IOWriteHandle *write_handle;
226
227   /**
228    * The read IO_handle associated with this socket
229    */
230   struct GNUNET_STREAM_IOReadHandle *read_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * Task identifier for the read io timeout task
245    */
246   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
247
248   /**
249    * Task identifier for retransmission task after timeout
250    */
251   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
252
253   /**
254    * The task for sending timely Acks
255    */
256   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
257
258   /**
259    * Task scheduled to continue a read operation.
260    */
261   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
262
263   /**
264    * The state of the protocol associated with this socket
265    */
266   enum State state;
267
268   /**
269    * The status of the socket
270    */
271   enum GNUNET_STREAM_Status status;
272
273   /**
274    * The number of previous timeouts; FIXME: currently not used
275    */
276   unsigned int retries;
277
278   /**
279    * The peer identity of the peer at the other end of the stream
280    */
281   GNUNET_PEER_Id other_peer;
282
283   /**
284    * Our Peer Identity (for debugging)
285    */
286   GNUNET_PEER_Id our_id;
287
288   /**
289    * The application port number (type: uint32_t)
290    */
291   GNUNET_MESH_ApplicationType app_port;
292
293   /**
294    * The session id associated with this stream connection
295    * FIXME: Not used currently, may be removed
296    */
297   uint32_t session_id;
298
299   /**
300    * Write sequence number. Set to random when sending HELLO(client) and
301    * HELLO_ACK(server) 
302    */
303   uint32_t write_sequence_number;
304
305   /**
306    * Read sequence number. This number's value is determined during handshake
307    */
308   uint32_t read_sequence_number;
309
310   /**
311    * The receiver buffer size
312    */
313   uint32_t receive_buffer_size;
314
315   /**
316    * The receiver buffer boundaries
317    */
318   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
319
320   /**
321    * receiver's available buffer after the last acknowledged packet
322    */
323   uint32_t receiver_window_available;
324
325   /**
326    * The offset pointer used during write operation
327    */
328   uint32_t write_offset;
329
330   /**
331    * The offset after which we are expecting data
332    */
333   uint32_t read_offset;
334
335   /**
336    * The offset upto which user has read from the received buffer
337    */
338   uint32_t copy_offset;
339 };
340
341
342 /**
343  * A socket for listening
344  */
345 struct GNUNET_STREAM_ListenSocket
346 {
347   /**
348    * The mesh handle
349    */
350   struct GNUNET_MESH_Handle *mesh;
351
352   /**
353    * The callback function which is called after successful opening socket
354    */
355   GNUNET_STREAM_ListenCallback listen_cb;
356
357   /**
358    * The call back closure
359    */
360   void *listen_cb_cls;
361
362   /**
363    * Our interned Peer's identity
364    */
365   GNUNET_PEER_Id our_id;
366
367   /**
368    * The service port
369    * FIXME: Remove if not required!
370    */
371   GNUNET_MESH_ApplicationType port;
372 };
373
374
375 /**
376  * The IO Write Handle
377  */
378 struct GNUNET_STREAM_IOWriteHandle
379 {
380   /**
381    * The socket to which this write handle is associated
382    */
383   struct GNUNET_STREAM_Socket *socket;
384
385   /**
386    * The packet_buffers associated with this Handle
387    */
388   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
389
390   /**
391    * The write continuation callback
392    */
393   GNUNET_STREAM_CompletionContinuation write_cont;
394
395   /**
396    * Write continuation closure
397    */
398   void *write_cont_cls;
399
400   /**
401    * The bitmap of this IOHandle; Corresponding bit for a message is set when
402    * it has been acknowledged by the receiver
403    */
404   GNUNET_STREAM_AckBitmap ack_bitmap;
405
406   /**
407    * Number of bytes in this write handle
408    */
409   size_t size;
410 };
411
412
413 /**
414  * The IO Read Handle
415  */
416 struct GNUNET_STREAM_IOReadHandle
417 {
418   /**
419    * Callback for the read processor
420    */
421   GNUNET_STREAM_DataProcessor proc;
422
423   /**
424    * The closure pointer for the read processor callback
425    */
426   void *proc_cls;
427 };
428
429
430 /**
431  * Handle for Shutdown
432  */
433 struct GNUNET_STREAM_ShutdownHandle
434 {
435   /**
436    * The socket associated with this shutdown handle
437    */
438   struct GNUNET_STREAM_Socket *socket;
439
440   /**
441    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
442    */
443   int operation;
444 };
445
446
447 /**
448  * Default value in seconds for various timeouts
449  */
450 static unsigned int default_timeout = 10;
451
452
453 /**
454  * Callback function for sending queued message
455  *
456  * @param cls closure the socket
457  * @param size number of bytes available in buf
458  * @param buf where the callee should write the message
459  * @return number of bytes written to buf
460  */
461 static size_t
462 send_message_notify (void *cls, size_t size, void *buf)
463 {
464   struct GNUNET_STREAM_Socket *socket = cls;
465   struct GNUNET_PeerIdentity target;
466   struct MessageQueue *head;
467   size_t ret;
468
469   socket->transmit_handle = NULL; /* Remove the transmit handle */
470   head = socket->queue_head;
471   if (NULL == head)
472     return 0; /* just to be safe */
473   GNUNET_PEER_resolve (socket->other_peer, &target);
474   if (0 == size)                /* request timed out */
475     {
476       socket->retries++;
477       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
478                   "Message sending timed out. Retry %d \n",
479                   socket->retries);
480       socket->transmit_handle = 
481         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
482                                            0, /* Corking */
483                                            1, /* Priority */
484                                            /* FIXME: exponential backoff */
485                                            socket->retransmit_timeout,
486                                            &target,
487                                            ntohs (head->message->header.size),
488                                            &send_message_notify,
489                                            socket);
490       return 0;
491     }
492
493   ret = ntohs (head->message->header.size);
494   GNUNET_assert (size >= ret);
495   memcpy (buf, head->message, ret);
496   if (NULL != head->finish_cb)
497     {
498       head->finish_cb (head->finish_cb_cls, socket);
499     }
500   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
501                                socket->queue_tail,
502                                head);
503   GNUNET_free (head->message);
504   GNUNET_free (head);
505   head = socket->queue_head;
506   if (NULL != head)    /* more pending messages to send */
507     {
508       socket->retries = 0;
509       socket->transmit_handle = 
510         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
511                                            0, /* Corking */
512                                            1, /* Priority */
513                                            /* FIXME: exponential backoff */
514                                            socket->retransmit_timeout,
515                                            &target,
516                                            ntohs (head->message->header.size),
517                                            &send_message_notify,
518                                            socket);
519     }
520   return ret;
521 }
522
523
524 /**
525  * Queues a message for sending using the mesh connection of a socket
526  *
527  * @param socket the socket whose mesh connection is used
528  * @param message the message to be sent
529  * @param finish_cb the callback to be called when the message is sent
530  * @param finish_cb_cls the closure for the callback
531  */
532 static void
533 queue_message (struct GNUNET_STREAM_Socket *socket,
534                struct GNUNET_STREAM_MessageHeader *message,
535                SendFinishCallback finish_cb,
536                void *finish_cb_cls)
537 {
538   struct MessageQueue *queue_entity;
539   struct GNUNET_PeerIdentity target;
540
541   GNUNET_assert 
542     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
543      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
544
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546               "%x: Queueing message of type %d and size %d\n",
547               socket->our_id,
548               ntohs (message->header.type),
549               ntohs (message->header.size));
550   GNUNET_assert (NULL != message);
551   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
552   queue_entity->message = message;
553   queue_entity->finish_cb = finish_cb;
554   queue_entity->finish_cb_cls = finish_cb_cls;
555   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
556                                     socket->queue_tail,
557                                     queue_entity);
558   if (NULL == socket->transmit_handle)
559   {
560     socket->retries = 0;
561     GNUNET_PEER_resolve (socket->other_peer, &target);
562     socket->transmit_handle = 
563       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
564                                          0, /* Corking */
565                                          1, /* Priority */
566                                          socket->retransmit_timeout,
567                                          &target,
568                                          ntohs (message->header.size),
569                                          &send_message_notify,
570                                          socket);
571   }
572 }
573
574
575 /**
576  * Copies a message and queues it for sending using the mesh connection of
577  * given socket 
578  *
579  * @param socket the socket whose mesh connection is used
580  * @param message the message to be sent
581  * @param finish_cb the callback to be called when the message is sent
582  * @param finish_cb_cls the closure for the callback
583  */
584 static void
585 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
586                         const struct GNUNET_STREAM_MessageHeader *message,
587                         SendFinishCallback finish_cb,
588                         void *finish_cb_cls)
589 {
590   struct GNUNET_STREAM_MessageHeader *msg_copy;
591   uint16_t size;
592   
593   size = ntohs (message->header.size);
594   msg_copy = GNUNET_malloc (size);
595   memcpy (msg_copy, message, size);
596   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
597 }
598
599
600 /**
601  * Callback function for sending ack message
602  *
603  * @param cls closure the ACK message created in ack_task
604  * @param size number of bytes available in buffer
605  * @param buf where the callee should write the message
606  * @return number of bytes written to buf
607  */
608 static size_t
609 send_ack_notify (void *cls, size_t size, void *buf)
610 {
611   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
612
613   if (0 == size)
614     {
615       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616                   "%s called with size 0\n", __func__);
617       return 0;
618     }
619   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
620   
621   size = ntohs (ack_msg->header.header.size);
622   memcpy (buf, ack_msg, size);
623   return size;
624 }
625
626 /**
627  * Writes data using the given socket. The amount of data written is limited by
628  * the receiver_window_size
629  *
630  * @param socket the socket to use
631  */
632 static void 
633 write_data (struct GNUNET_STREAM_Socket *socket);
634
635 /**
636  * Task for retransmitting data messages if they aren't ACK before their ack
637  * deadline 
638  *
639  * @param cls the socket
640  * @param tc the Task context
641  */
642 static void
643 retransmission_timeout_task (void *cls,
644                              const struct GNUNET_SCHEDULER_TaskContext *tc)
645 {
646   struct GNUNET_STREAM_Socket *socket = cls;
647   
648   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
649     return;
650
651   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652               "%x: Retransmitting DATA...\n", socket->our_id);
653   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
654   write_data (socket);
655 }
656
657
658 /**
659  * Task for sending ACK message
660  *
661  * @param cls the socket
662  * @param tc the Task context
663  */
664 static void
665 ack_task (void *cls,
666           const struct GNUNET_SCHEDULER_TaskContext *tc)
667 {
668   struct GNUNET_STREAM_Socket *socket = cls;
669   struct GNUNET_STREAM_AckMessage *ack_msg;
670   struct GNUNET_PeerIdentity target;
671
672   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
673     {
674       return;
675     }
676
677   socket->ack_task_id = 0;
678
679   /* Create the ACK Message */
680   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
681   ack_msg->header.header.size = htons (sizeof (struct 
682                                                GNUNET_STREAM_AckMessage));
683   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
684   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
685   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
686   ack_msg->receive_window_remaining = 
687     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
688
689   GNUNET_PEER_resolve (socket->other_peer, &target);
690   /* Request MESH for sending ACK */
691   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
692                                      0, /* Corking */
693                                      1, /* Priority */
694                                      socket->retransmit_timeout,
695                                      &target,
696                                      ntohs (ack_msg->header.header.size),
697                                      &send_ack_notify,
698                                      ack_msg);
699
700   
701 }
702
703
704 /**
705  * Function to modify a bit in GNUNET_STREAM_AckBitmap
706  *
707  * @param bitmap the bitmap to modify
708  * @param bit the bit number to modify
709  * @param value GNUNET_YES to on, GNUNET_NO to off
710  */
711 static void
712 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
713                       unsigned int bit, 
714                       int value)
715 {
716   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
717   if (GNUNET_YES == value)
718     *bitmap |= (1LL << bit);
719   else
720     *bitmap &= ~(1LL << bit);
721 }
722
723
724 /**
725  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
726  *
727  * @param bitmap address of the bitmap that has to be checked
728  * @param bit the bit number to check
729  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
730  */
731 static uint8_t
732 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
733                       unsigned int bit)
734 {
735   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
736   return 0 != (*bitmap & (1LL << bit));
737 }
738
739
740 /**
741  * Writes data using the given socket. The amount of data written is limited by
742  * the receiver_window_size
743  *
744  * @param socket the socket to use
745  */
746 static void 
747 write_data (struct GNUNET_STREAM_Socket *socket)
748 {
749   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
750   int packet;                   /* Although an int, should never be negative */
751   int ack_packet;
752
753   ack_packet = -1;
754   /* Find the last acknowledged packet */
755   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
756     {      
757       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
758                                               packet))
759         ack_packet = packet;        
760       else if (NULL == io_handle->messages[packet])
761         break;
762     }
763   /* Resend packets which weren't ack'ed */
764   for (packet=0; packet < ack_packet; packet++)
765     {
766       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
767                                              packet))
768         {
769           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
770                       "%x: Placing DATA message with sequence %u in send queue\n",
771                       socket->our_id,
772                       ntohl (io_handle->messages[packet]->sequence_number));
773
774           copy_and_queue_message (socket,
775                                   &io_handle->messages[packet]->header,
776                                   NULL,
777                                   NULL);
778         }
779     }
780   packet = ack_packet + 1;
781   /* Now send new packets if there is enough buffer space */
782   while ( (NULL != io_handle->messages[packet]) &&
783           (socket->receiver_window_available 
784            >= ntohs (io_handle->messages[packet]->header.header.size)) )
785     {
786       socket->receiver_window_available -= 
787         ntohs (io_handle->messages[packet]->header.header.size);
788       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789                   "%x: Placing DATA message with sequence %u in send queue\n",
790                   socket->our_id,
791                   ntohl (io_handle->messages[packet]->sequence_number));
792       copy_and_queue_message (socket,
793                               &io_handle->messages[packet]->header,
794                               NULL,
795                               NULL);
796       packet++;
797     }
798
799   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
800     socket->retransmission_timeout_task_id = 
801       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
802                                     (GNUNET_TIME_UNIT_SECONDS, 8),
803                                     &retransmission_timeout_task,
804                                     socket);
805 }
806
807
808 /**
809  * Task for calling the read processor
810  *
811  * @param cls the socket
812  * @param tc the task context
813  */
814 static void
815 call_read_processor (void *cls,
816                      const struct GNUNET_SCHEDULER_TaskContext *tc)
817 {
818   struct GNUNET_STREAM_Socket *socket = cls;
819   size_t read_size;
820   size_t valid_read_size;
821   unsigned int packet;
822   uint32_t sequence_increase;
823   uint32_t offset_increase;
824
825   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
826   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
827     return;
828
829   GNUNET_assert (NULL != socket->read_handle);
830   GNUNET_assert (NULL != socket->read_handle->proc);
831
832   /* Check the bitmap for any holes */
833   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
834     {
835       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
836                                              packet))
837         break;
838     }
839   /* We only call read processor if we have the first packet */
840   GNUNET_assert (0 < packet);
841
842   valid_read_size = 
843     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
844
845   GNUNET_assert (0 != valid_read_size);
846
847   /* Cancel the read_io_timeout_task */
848   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
849   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
850
851   /* Call the data processor */
852   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853               "%x: Calling read processor\n",
854               socket->our_id);
855   read_size = 
856     socket->read_handle->proc (socket->read_handle->proc_cls,
857                                socket->status,
858                                socket->receive_buffer + socket->copy_offset,
859                                valid_read_size);
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "%x: Read processor read %d bytes\n",
862               socket->our_id,
863               read_size);
864   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865               "%x: Read processor completed successfully\n",
866               socket->our_id);
867
868   /* Free the read handle */
869   GNUNET_free (socket->read_handle);
870   socket->read_handle = NULL;
871
872   GNUNET_assert (read_size <= valid_read_size);
873   socket->copy_offset += read_size;
874
875   /* Determine upto which packet we can remove from the buffer */
876   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
877     {
878       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
879         { packet++; break; }
880       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
881         break;
882     }
883
884   /* If no packets can be removed we can't move the buffer */
885   if (0 == packet) return;
886
887   sequence_increase = packet;
888   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889               "%x: Sequence increase after read processor completion: %u\n",
890               socket->our_id,
891               sequence_increase);
892
893   /* Shift the data in the receive buffer */
894   memmove (socket->receive_buffer,
895            socket->receive_buffer 
896            + socket->receive_buffer_boundaries[sequence_increase-1],
897            socket->receive_buffer_size
898            - socket->receive_buffer_boundaries[sequence_increase-1]);
899   
900   /* Shift the bitmap */
901   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
902   
903   /* Set read_sequence_number */
904   socket->read_sequence_number += sequence_increase;
905   
906   /* Set read_offset */
907   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
908   socket->read_offset += offset_increase;
909
910   /* Fix copy_offset */
911   GNUNET_assert (offset_increase <= socket->copy_offset);
912   socket->copy_offset -= offset_increase;
913   
914   /* Fix relative boundaries */
915   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
916     {
917       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
918         {
919           socket->receive_buffer_boundaries[packet] = 
920             socket->receive_buffer_boundaries[packet + sequence_increase] 
921             - offset_increase;
922         }
923       else
924         socket->receive_buffer_boundaries[packet] = 0;
925     }
926 }
927
928
929 /**
930  * Cancels the existing read io handle
931  *
932  * @param cls the closure from the SCHEDULER call
933  * @param tc the task context
934  */
935 static void
936 read_io_timeout (void *cls, 
937                 const struct GNUNET_SCHEDULER_TaskContext *tc)
938 {
939   struct GNUNET_STREAM_Socket *socket = cls;
940   GNUNET_STREAM_DataProcessor proc;
941   void *proc_cls;
942
943   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
944   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
945   {
946     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947                 "%x: Read task timedout - Cancelling it\n",
948                 socket->our_id);
949     GNUNET_SCHEDULER_cancel (socket->read_task_id);
950     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
951   }
952   GNUNET_assert (NULL != socket->read_handle);
953   proc = socket->read_handle->proc;
954   proc_cls = socket->read_handle->proc_cls;
955
956   GNUNET_free (socket->read_handle);
957   socket->read_handle = NULL;
958   /* Call the read processor to signal timeout */
959   proc (proc_cls,
960         GNUNET_STREAM_TIMEOUT,
961         NULL,
962         0);
963 }
964
965
966 /**
967  * Handler for DATA messages; Same for both client and server
968  *
969  * @param socket the socket through which the ack was received
970  * @param tunnel connection to the other end
971  * @param sender who sent the message
972  * @param msg the data message
973  * @param atsi performance data for the connection
974  * @return GNUNET_OK to keep the connection open,
975  *         GNUNET_SYSERR to close it (signal serious error)
976  */
977 static int
978 handle_data (struct GNUNET_STREAM_Socket *socket,
979              struct GNUNET_MESH_Tunnel *tunnel,
980              const struct GNUNET_PeerIdentity *sender,
981              const struct GNUNET_STREAM_DataMessage *msg,
982              const struct GNUNET_ATS_Information*atsi)
983 {
984   const void *payload;
985   uint32_t bytes_needed;
986   uint32_t relative_offset;
987   uint32_t relative_sequence_number;
988   uint16_t size;
989
990   size = htons (msg->header.header.size);
991   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
992     {
993       GNUNET_break_op (0);
994       return GNUNET_SYSERR;
995     }
996
997   if (GNUNET_PEER_search (sender) != socket->other_peer)
998     {
999       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000                   "%x: Received DATA from non-confirming peer\n",
1001                   socket->our_id);
1002       return GNUNET_YES;
1003     }
1004
1005   switch (socket->state)
1006     {
1007     case STATE_ESTABLISHED:
1008     case STATE_TRANSMIT_CLOSED:
1009     case STATE_TRANSMIT_CLOSE_WAIT:
1010       
1011       /* check if the message's sequence number is in the range we are
1012          expecting */
1013       relative_sequence_number = 
1014         ntohl (msg->sequence_number) - socket->read_sequence_number;
1015       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1016         {
1017           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018                       "%x: Ignoring received message with sequence number %u\n",
1019                       socket->our_id,
1020                       ntohl (msg->sequence_number));
1021           /* Start ACK sending task if one is not already present */
1022           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1023             {
1024               socket->ack_task_id = 
1025                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1026                                               (msg->ack_deadline),
1027                                               &ack_task,
1028                                               socket);
1029             }
1030           return GNUNET_YES;
1031         }
1032       
1033       /* Check if we have already seen this message */
1034       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1035                                               relative_sequence_number))
1036         {
1037           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                       "%x: Ignoring already received message with sequence "
1039                       "number %u\n",
1040                       socket->our_id,
1041                       ntohl (msg->sequence_number));
1042           /* Start ACK sending task if one is not already present */
1043           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1044             {
1045               socket->ack_task_id = 
1046                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1047                                               (msg->ack_deadline),
1048                                               &ack_task,
1049                                               socket);
1050             }
1051           return GNUNET_YES;
1052         }
1053
1054       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055                   "%x: Receiving DATA with sequence number: %u and size: %d "
1056                   "from %x\n",
1057                   socket->our_id,
1058                   ntohl (msg->sequence_number),
1059                   ntohs (msg->header.header.size),
1060                   socket->other_peer);
1061
1062       /* Check if we have to allocate the buffer */
1063       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1064       relative_offset = ntohl (msg->offset) - socket->read_offset;
1065       bytes_needed = relative_offset + size;
1066       if (bytes_needed > socket->receive_buffer_size)
1067         {
1068           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1069             {
1070               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1071                                                        bytes_needed);
1072               socket->receive_buffer_size = bytes_needed;
1073             }
1074           else
1075             {
1076               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077                           "%x: Cannot accommodate packet %d as buffer is",
1078                           "full\n",
1079                           socket->our_id,
1080                           ntohl (msg->sequence_number));
1081               return GNUNET_YES;
1082             }
1083         }
1084       
1085       /* Copy Data to buffer */
1086       payload = &msg[1];
1087       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1088       memcpy (socket->receive_buffer + relative_offset,
1089               payload,
1090               size);
1091       socket->receive_buffer_boundaries[relative_sequence_number] = 
1092         relative_offset + size;
1093       
1094       /* Modify the ACK bitmap */
1095       ackbitmap_modify_bit (&socket->ack_bitmap,
1096                             relative_sequence_number,
1097                             GNUNET_YES);
1098
1099       /* Start ACK sending task if one is not already present */
1100       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1101        {
1102          socket->ack_task_id = 
1103            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1104                                          (msg->ack_deadline),
1105                                          &ack_task,
1106                                          socket);
1107        }
1108
1109       if ((NULL != socket->read_handle) /* A read handle is waiting */
1110           /* There is no current read task */
1111           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1112           /* We have the first packet */
1113           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1114                                                  0)))
1115         {
1116           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1117                       "%x: Scheduling read processor\n",
1118                       socket->our_id);
1119
1120           socket->read_task_id = 
1121             GNUNET_SCHEDULER_add_now (&call_read_processor,
1122                                       socket);
1123         }
1124       
1125       break;
1126
1127     default:
1128       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129                   "%x: Received data message when it cannot be handled\n",
1130                   socket->our_id);
1131       break;
1132     }
1133   return GNUNET_YES;
1134 }
1135
1136
1137 /**
1138  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1139  *
1140  * @param cls the socket (set from GNUNET_MESH_connect)
1141  * @param tunnel connection to the other end
1142  * @param tunnel_ctx place to store local state associated with the tunnel
1143  * @param sender who sent the message
1144  * @param message the actual message
1145  * @param atsi performance data for the connection
1146  * @return GNUNET_OK to keep the connection open,
1147  *         GNUNET_SYSERR to close it (signal serious error)
1148  */
1149 static int
1150 client_handle_data (void *cls,
1151              struct GNUNET_MESH_Tunnel *tunnel,
1152              void **tunnel_ctx,
1153              const struct GNUNET_PeerIdentity *sender,
1154              const struct GNUNET_MessageHeader *message,
1155              const struct GNUNET_ATS_Information*atsi)
1156 {
1157   struct GNUNET_STREAM_Socket *socket = cls;
1158
1159   return handle_data (socket, 
1160                       tunnel, 
1161                       sender, 
1162                       (const struct GNUNET_STREAM_DataMessage *) message, 
1163                       atsi);
1164 }
1165
1166
1167 /**
1168  * Callback to set state to ESTABLISHED
1169  *
1170  * @param cls the closure from queue_message FIXME: document
1171  * @param socket the socket to requiring state change
1172  */
1173 static void
1174 set_state_established (void *cls,
1175                        struct GNUNET_STREAM_Socket *socket)
1176 {
1177   struct GNUNET_PeerIdentity initiator_pid;
1178
1179   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1180               "%x: Attaining ESTABLISHED state\n",
1181               socket->our_id);
1182   socket->write_offset = 0;
1183   socket->read_offset = 0;
1184   socket->state = STATE_ESTABLISHED;
1185   /* FIXME: What if listen_cb is NULL */
1186   if (NULL != socket->lsocket)
1187     {
1188       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1189       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190                   "%x: Calling listen callback\n",
1191                   socket->our_id);
1192       if (GNUNET_SYSERR == 
1193           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1194                                       socket,
1195                                       &initiator_pid))
1196         {
1197           socket->state = STATE_CLOSED;
1198           /* FIXME: We should close in a decent way */
1199           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1200           GNUNET_free (socket);
1201         }
1202     }
1203   else if (socket->open_cb)
1204     socket->open_cb (socket->open_cls, socket);
1205 }
1206
1207
1208 /**
1209  * Callback to set state to HELLO_WAIT
1210  *
1211  * @param cls the closure from queue_message
1212  * @param socket the socket to requiring state change
1213  */
1214 static void
1215 set_state_hello_wait (void *cls,
1216                       struct GNUNET_STREAM_Socket *socket)
1217 {
1218   GNUNET_assert (STATE_INIT == socket->state);
1219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1220               "%x: Attaining HELLO_WAIT state\n",
1221               socket->our_id);
1222   socket->state = STATE_HELLO_WAIT;
1223 }
1224
1225
1226 /**
1227  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1228  * socket
1229  *
1230  * @param socket the socket for which this HelloAckMessage has to be generated
1231  * @return the HelloAckMessage
1232  */
1233 static struct GNUNET_STREAM_HelloAckMessage *
1234 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1235 {
1236   struct GNUNET_STREAM_HelloAckMessage *msg;
1237
1238   /* Get the random sequence number */
1239   socket->write_sequence_number = 
1240     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1242               "%x: Generated write sequence number %u\n",
1243               socket->our_id,
1244               (unsigned int) socket->write_sequence_number);
1245   
1246   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1247   msg->header.header.size = 
1248     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1249   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1250   msg->sequence_number = htonl (socket->write_sequence_number);
1251   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1252
1253   return msg;
1254 }
1255
1256
1257 /**
1258  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1259  *
1260  * @param cls the socket (set from GNUNET_MESH_connect)
1261  * @param tunnel connection to the other end
1262  * @param tunnel_ctx this is NULL
1263  * @param sender who sent the message
1264  * @param message the actual message
1265  * @param atsi performance data for the connection
1266  * @return GNUNET_OK to keep the connection open,
1267  *         GNUNET_SYSERR to close it (signal serious error)
1268  */
1269 static int
1270 client_handle_hello_ack (void *cls,
1271                          struct GNUNET_MESH_Tunnel *tunnel,
1272                          void **tunnel_ctx,
1273                          const struct GNUNET_PeerIdentity *sender,
1274                          const struct GNUNET_MessageHeader *message,
1275                          const struct GNUNET_ATS_Information*atsi)
1276 {
1277   struct GNUNET_STREAM_Socket *socket = cls;
1278   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1279   struct GNUNET_STREAM_HelloAckMessage *reply;
1280
1281   if (GNUNET_PEER_search (sender) != socket->other_peer)
1282     {
1283       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284                   "%x: Received HELLO_ACK from non-confirming peer\n",
1285                   socket->our_id);
1286       return GNUNET_YES;
1287     }
1288   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1290               "%x: Received HELLO_ACK from %x\n",
1291               socket->our_id,
1292               socket->other_peer);
1293
1294   GNUNET_assert (socket->tunnel == tunnel);
1295   switch (socket->state)
1296   {
1297   case STATE_HELLO_WAIT:
1298     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1299     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300                 "%x: Read sequence number %u\n",
1301                 socket->our_id,
1302                 (unsigned int) socket->read_sequence_number);
1303     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1304     reply = generate_hello_ack_msg (socket);
1305     queue_message (socket,
1306                    &reply->header, 
1307                    &set_state_established, 
1308                    NULL);      
1309     return GNUNET_OK;
1310   case STATE_ESTABLISHED:
1311   case STATE_RECEIVE_CLOSE_WAIT:
1312     // call statistics (# ACKs ignored++)
1313     return GNUNET_OK;
1314   case STATE_INIT:
1315   default:
1316     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1318                 socket->our_id,
1319                 socket->other_peer,
1320                 socket->state);
1321     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1322     return GNUNET_SYSERR;
1323   }
1324
1325 }
1326
1327
1328 /**
1329  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1330  *
1331  * @param cls the socket (set from GNUNET_MESH_connect)
1332  * @param tunnel connection to the other end
1333  * @param tunnel_ctx this is NULL
1334  * @param sender who sent the message
1335  * @param message the actual message
1336  * @param atsi performance data for the connection
1337  * @return GNUNET_OK to keep the connection open,
1338  *         GNUNET_SYSERR to close it (signal serious error)
1339  */
1340 static int
1341 client_handle_reset (void *cls,
1342                      struct GNUNET_MESH_Tunnel *tunnel,
1343                      void **tunnel_ctx,
1344                      const struct GNUNET_PeerIdentity *sender,
1345                      const struct GNUNET_MessageHeader *message,
1346                      const struct GNUNET_ATS_Information*atsi)
1347 {
1348   struct GNUNET_STREAM_Socket *socket = cls;
1349
1350   return GNUNET_OK;
1351 }
1352
1353
1354 /**
1355  * Common message handler for handling TRANSMIT_CLOSE messages
1356  *
1357  * @param socket the socket through which the ack was received
1358  * @param tunnel connection to the other end
1359  * @param sender who sent the message
1360  * @param msg the transmit close message
1361  * @param atsi performance data for the connection
1362  * @return GNUNET_OK to keep the connection open,
1363  *         GNUNET_SYSERR to close it (signal serious error)
1364  */
1365 static int
1366 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1367                        struct GNUNET_MESH_Tunnel *tunnel,
1368                        const struct GNUNET_PeerIdentity *sender,
1369                        const struct GNUNET_STREAM_MessageHeader *msg,
1370                        const struct GNUNET_ATS_Information*atsi)
1371 {
1372   struct GNUNET_STREAM_MessageHeader *reply;
1373
1374   switch (socket->state)
1375     {
1376     case STATE_ESTABLISHED:
1377       socket->state = STATE_RECEIVE_CLOSED;
1378
1379       /* Send TRANSMIT_CLOSE_ACK */
1380       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1381       reply->header.type = 
1382         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1383       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1384       queue_message (socket, reply, NULL, NULL);
1385       break;
1386
1387     default:
1388       /* FIXME: Call statistics? */
1389       break;
1390     }
1391   return GNUNET_YES;
1392 }
1393
1394
1395 /**
1396  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1397  *
1398  * @param cls the socket (set from GNUNET_MESH_connect)
1399  * @param tunnel connection to the other end
1400  * @param tunnel_ctx this is NULL
1401  * @param sender who sent the message
1402  * @param message the actual message
1403  * @param atsi performance data for the connection
1404  * @return GNUNET_OK to keep the connection open,
1405  *         GNUNET_SYSERR to close it (signal serious error)
1406  */
1407 static int
1408 client_handle_transmit_close (void *cls,
1409                               struct GNUNET_MESH_Tunnel *tunnel,
1410                               void **tunnel_ctx,
1411                               const struct GNUNET_PeerIdentity *sender,
1412                               const struct GNUNET_MessageHeader *message,
1413                               const struct GNUNET_ATS_Information*atsi)
1414 {
1415   struct GNUNET_STREAM_Socket *socket = cls;
1416   
1417   return handle_transmit_close (socket,
1418                                 tunnel,
1419                                 sender,
1420                                 (struct GNUNET_STREAM_MessageHeader *)message,
1421                                 atsi);
1422 }
1423
1424
1425 /**
1426  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1427  *
1428  * @param cls the socket (set from GNUNET_MESH_connect)
1429  * @param tunnel connection to the other end
1430  * @param tunnel_ctx this is NULL
1431  * @param sender who sent the message
1432  * @param message the actual message
1433  * @param atsi performance data for the connection
1434  * @return GNUNET_OK to keep the connection open,
1435  *         GNUNET_SYSERR to close it (signal serious error)
1436  */
1437 static int
1438 client_handle_transmit_close_ack (void *cls,
1439                                   struct GNUNET_MESH_Tunnel *tunnel,
1440                                   void **tunnel_ctx,
1441                                   const struct GNUNET_PeerIdentity *sender,
1442                                   const struct GNUNET_MessageHeader *message,
1443                                   const struct GNUNET_ATS_Information*atsi)
1444 {
1445   struct GNUNET_STREAM_Socket *socket = cls;
1446
1447   return GNUNET_OK;
1448 }
1449
1450
1451 /**
1452  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1453  *
1454  * @param cls the socket (set from GNUNET_MESH_connect)
1455  * @param tunnel connection to the other end
1456  * @param tunnel_ctx this is NULL
1457  * @param sender who sent the message
1458  * @param message the actual message
1459  * @param atsi performance data for the connection
1460  * @return GNUNET_OK to keep the connection open,
1461  *         GNUNET_SYSERR to close it (signal serious error)
1462  */
1463 static int
1464 client_handle_receive_close (void *cls,
1465                              struct GNUNET_MESH_Tunnel *tunnel,
1466                              void **tunnel_ctx,
1467                              const struct GNUNET_PeerIdentity *sender,
1468                              const struct GNUNET_MessageHeader *message,
1469                              const struct GNUNET_ATS_Information*atsi)
1470 {
1471   struct GNUNET_STREAM_Socket *socket = cls;
1472
1473   return GNUNET_OK;
1474 }
1475
1476
1477 /**
1478  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1479  *
1480  * @param cls the socket (set from GNUNET_MESH_connect)
1481  * @param tunnel connection to the other end
1482  * @param tunnel_ctx this is NULL
1483  * @param sender who sent the message
1484  * @param message the actual message
1485  * @param atsi performance data for the connection
1486  * @return GNUNET_OK to keep the connection open,
1487  *         GNUNET_SYSERR to close it (signal serious error)
1488  */
1489 static int
1490 client_handle_receive_close_ack (void *cls,
1491                                  struct GNUNET_MESH_Tunnel *tunnel,
1492                                  void **tunnel_ctx,
1493                                  const struct GNUNET_PeerIdentity *sender,
1494                                  const struct GNUNET_MessageHeader *message,
1495                                  const struct GNUNET_ATS_Information*atsi)
1496 {
1497   struct GNUNET_STREAM_Socket *socket = cls;
1498
1499   return GNUNET_OK;
1500 }
1501
1502
1503 /**
1504  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1505  *
1506  * @param cls the socket (set from GNUNET_MESH_connect)
1507  * @param tunnel connection to the other end
1508  * @param tunnel_ctx this is NULL
1509  * @param sender who sent the message
1510  * @param message the actual message
1511  * @param atsi performance data for the connection
1512  * @return GNUNET_OK to keep the connection open,
1513  *         GNUNET_SYSERR to close it (signal serious error)
1514  */
1515 static int
1516 client_handle_close (void *cls,
1517                      struct GNUNET_MESH_Tunnel *tunnel,
1518                      void **tunnel_ctx,
1519                      const struct GNUNET_PeerIdentity *sender,
1520                      const struct GNUNET_MessageHeader *message,
1521                      const struct GNUNET_ATS_Information*atsi)
1522 {
1523   struct GNUNET_STREAM_Socket *socket = cls;
1524
1525   return GNUNET_OK;
1526 }
1527
1528
1529 /**
1530  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1531  *
1532  * @param cls the socket (set from GNUNET_MESH_connect)
1533  * @param tunnel connection to the other end
1534  * @param tunnel_ctx this is NULL
1535  * @param sender who sent the message
1536  * @param message the actual message
1537  * @param atsi performance data for the connection
1538  * @return GNUNET_OK to keep the connection open,
1539  *         GNUNET_SYSERR to close it (signal serious error)
1540  */
1541 static int
1542 client_handle_close_ack (void *cls,
1543                          struct GNUNET_MESH_Tunnel *tunnel,
1544                          void **tunnel_ctx,
1545                          const struct GNUNET_PeerIdentity *sender,
1546                          const struct GNUNET_MessageHeader *message,
1547                          const struct GNUNET_ATS_Information*atsi)
1548 {
1549   struct GNUNET_STREAM_Socket *socket = cls;
1550
1551   return GNUNET_OK;
1552 }
1553
1554 /*****************************/
1555 /* Server's Message Handlers */
1556 /*****************************/
1557
1558 /**
1559  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1560  *
1561  * @param cls the closure
1562  * @param tunnel connection to the other end
1563  * @param tunnel_ctx the socket
1564  * @param sender who sent the message
1565  * @param message the actual message
1566  * @param atsi performance data for the connection
1567  * @return GNUNET_OK to keep the connection open,
1568  *         GNUNET_SYSERR to close it (signal serious error)
1569  */
1570 static int
1571 server_handle_data (void *cls,
1572                     struct GNUNET_MESH_Tunnel *tunnel,
1573                     void **tunnel_ctx,
1574                     const struct GNUNET_PeerIdentity *sender,
1575                     const struct GNUNET_MessageHeader *message,
1576                     const struct GNUNET_ATS_Information*atsi)
1577 {
1578   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1579
1580   return handle_data (socket,
1581                       tunnel,
1582                       sender,
1583                       (const struct GNUNET_STREAM_DataMessage *)message,
1584                       atsi);
1585 }
1586
1587
1588 /**
1589  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1590  *
1591  * @param cls the closure
1592  * @param tunnel connection to the other end
1593  * @param tunnel_ctx the socket
1594  * @param sender who sent the message
1595  * @param message the actual message
1596  * @param atsi performance data for the connection
1597  * @return GNUNET_OK to keep the connection open,
1598  *         GNUNET_SYSERR to close it (signal serious error)
1599  */
1600 static int
1601 server_handle_hello (void *cls,
1602                      struct GNUNET_MESH_Tunnel *tunnel,
1603                      void **tunnel_ctx,
1604                      const struct GNUNET_PeerIdentity *sender,
1605                      const struct GNUNET_MessageHeader *message,
1606                      const struct GNUNET_ATS_Information*atsi)
1607 {
1608   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1609   struct GNUNET_STREAM_HelloAckMessage *reply;
1610
1611   if (GNUNET_PEER_search (sender) != socket->other_peer)
1612     {
1613       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614                   "%x: Received HELLO from non-confirming peer\n",
1615                   socket->our_id);
1616       return GNUNET_YES;
1617     }
1618
1619   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1620                  ntohs (message->type));
1621   GNUNET_assert (socket->tunnel == tunnel);
1622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1623               "%x: Received HELLO from %x\n", 
1624               socket->our_id,
1625               socket->other_peer);
1626
1627   if (STATE_INIT == socket->state)
1628     {
1629       reply = generate_hello_ack_msg (socket);
1630       queue_message (socket, 
1631                      &reply->header,
1632                      &set_state_hello_wait, 
1633                      NULL);
1634     }
1635   else
1636     {
1637       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1638                   "Client sent HELLO when in state %d\n", socket->state);
1639       /* FIXME: Send RESET? */
1640       
1641     }
1642   return GNUNET_OK;
1643 }
1644
1645
1646 /**
1647  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1648  *
1649  * @param cls the closure
1650  * @param tunnel connection to the other end
1651  * @param tunnel_ctx the socket
1652  * @param sender who sent the message
1653  * @param message the actual message
1654  * @param atsi performance data for the connection
1655  * @return GNUNET_OK to keep the connection open,
1656  *         GNUNET_SYSERR to close it (signal serious error)
1657  */
1658 static int
1659 server_handle_hello_ack (void *cls,
1660                          struct GNUNET_MESH_Tunnel *tunnel,
1661                          void **tunnel_ctx,
1662                          const struct GNUNET_PeerIdentity *sender,
1663                          const struct GNUNET_MessageHeader *message,
1664                          const struct GNUNET_ATS_Information*atsi)
1665 {
1666   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1667   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1668
1669   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1670                  ntohs (message->type));
1671   GNUNET_assert (socket->tunnel == tunnel);
1672   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1673   if (STATE_HELLO_WAIT == socket->state)
1674     {
1675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1676                   "%x: Received HELLO_ACK from %x\n",
1677                   socket->our_id,
1678                   socket->other_peer);
1679       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1680       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1681                   "%x: Read sequence number %u\n",
1682                   socket->our_id,
1683                   (unsigned int) socket->read_sequence_number);
1684       socket->receiver_window_available = 
1685         ntohl (ack_message->receiver_window_size);
1686       /* Attain ESTABLISHED state */
1687       set_state_established (NULL, socket);
1688     }
1689   else
1690     {
1691       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1692                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1693       /* FIXME: Send RESET? */
1694       
1695     }
1696   return GNUNET_OK;
1697 }
1698
1699
1700 /**
1701  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1702  *
1703  * @param cls the closure
1704  * @param tunnel connection to the other end
1705  * @param tunnel_ctx the socket
1706  * @param sender who sent the message
1707  * @param message the actual message
1708  * @param atsi performance data for the connection
1709  * @return GNUNET_OK to keep the connection open,
1710  *         GNUNET_SYSERR to close it (signal serious error)
1711  */
1712 static int
1713 server_handle_reset (void *cls,
1714                      struct GNUNET_MESH_Tunnel *tunnel,
1715                      void **tunnel_ctx,
1716                      const struct GNUNET_PeerIdentity *sender,
1717                      const struct GNUNET_MessageHeader *message,
1718                      const struct GNUNET_ATS_Information*atsi)
1719 {
1720   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1721
1722   return GNUNET_OK;
1723 }
1724
1725
1726 /**
1727  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1728  *
1729  * @param cls the closure
1730  * @param tunnel connection to the other end
1731  * @param tunnel_ctx the socket
1732  * @param sender who sent the message
1733  * @param message the actual message
1734  * @param atsi performance data for the connection
1735  * @return GNUNET_OK to keep the connection open,
1736  *         GNUNET_SYSERR to close it (signal serious error)
1737  */
1738 static int
1739 server_handle_transmit_close (void *cls,
1740                               struct GNUNET_MESH_Tunnel *tunnel,
1741                               void **tunnel_ctx,
1742                               const struct GNUNET_PeerIdentity *sender,
1743                               const struct GNUNET_MessageHeader *message,
1744                               const struct GNUNET_ATS_Information*atsi)
1745 {
1746   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1747
1748   return handle_transmit_close (socket,
1749                                 tunnel,
1750                                 sender,
1751                                 (struct GNUNET_STREAM_MessageHeader *)message,
1752                                 atsi);
1753 }
1754
1755
1756 /**
1757  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1758  *
1759  * @param cls the closure
1760  * @param tunnel connection to the other end
1761  * @param tunnel_ctx the socket
1762  * @param sender who sent the message
1763  * @param message the actual message
1764  * @param atsi performance data for the connection
1765  * @return GNUNET_OK to keep the connection open,
1766  *         GNUNET_SYSERR to close it (signal serious error)
1767  */
1768 static int
1769 server_handle_transmit_close_ack (void *cls,
1770                                   struct GNUNET_MESH_Tunnel *tunnel,
1771                                   void **tunnel_ctx,
1772                                   const struct GNUNET_PeerIdentity *sender,
1773                                   const struct GNUNET_MessageHeader *message,
1774                                   const struct GNUNET_ATS_Information*atsi)
1775 {
1776   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1777
1778   return GNUNET_OK;
1779 }
1780
1781
1782 /**
1783  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1784  *
1785  * @param cls the closure
1786  * @param tunnel connection to the other end
1787  * @param tunnel_ctx the socket
1788  * @param sender who sent the message
1789  * @param message the actual message
1790  * @param atsi performance data for the connection
1791  * @return GNUNET_OK to keep the connection open,
1792  *         GNUNET_SYSERR to close it (signal serious error)
1793  */
1794 static int
1795 server_handle_receive_close (void *cls,
1796                              struct GNUNET_MESH_Tunnel *tunnel,
1797                              void **tunnel_ctx,
1798                              const struct GNUNET_PeerIdentity *sender,
1799                              const struct GNUNET_MessageHeader *message,
1800                              const struct GNUNET_ATS_Information*atsi)
1801 {
1802   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1803
1804   return GNUNET_OK;
1805 }
1806
1807
1808 /**
1809  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1810  *
1811  * @param cls the closure
1812  * @param tunnel connection to the other end
1813  * @param tunnel_ctx the socket
1814  * @param sender who sent the message
1815  * @param message the actual message
1816  * @param atsi performance data for the connection
1817  * @return GNUNET_OK to keep the connection open,
1818  *         GNUNET_SYSERR to close it (signal serious error)
1819  */
1820 static int
1821 server_handle_receive_close_ack (void *cls,
1822                                  struct GNUNET_MESH_Tunnel *tunnel,
1823                                  void **tunnel_ctx,
1824                                  const struct GNUNET_PeerIdentity *sender,
1825                                  const struct GNUNET_MessageHeader *message,
1826                                  const struct GNUNET_ATS_Information*atsi)
1827 {
1828   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1829
1830   return GNUNET_OK;
1831 }
1832
1833
1834 /**
1835  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1836  *
1837  * @param cls the closure
1838  * @param tunnel connection to the other end
1839  * @param tunnel_ctx the socket
1840  * @param sender who sent the message
1841  * @param message the actual message
1842  * @param atsi performance data for the connection
1843  * @return GNUNET_OK to keep the connection open,
1844  *         GNUNET_SYSERR to close it (signal serious error)
1845  */
1846 static int
1847 server_handle_close (void *cls,
1848                      struct GNUNET_MESH_Tunnel *tunnel,
1849                      void **tunnel_ctx,
1850                      const struct GNUNET_PeerIdentity *sender,
1851                      const struct GNUNET_MessageHeader *message,
1852                      const struct GNUNET_ATS_Information*atsi)
1853 {
1854   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1855
1856   return GNUNET_OK;
1857 }
1858
1859
1860 /**
1861  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1862  *
1863  * @param cls the closure
1864  * @param tunnel connection to the other end
1865  * @param tunnel_ctx the socket
1866  * @param sender who sent the message
1867  * @param message the actual message
1868  * @param atsi performance data for the connection
1869  * @return GNUNET_OK to keep the connection open,
1870  *         GNUNET_SYSERR to close it (signal serious error)
1871  */
1872 static int
1873 server_handle_close_ack (void *cls,
1874                          struct GNUNET_MESH_Tunnel *tunnel,
1875                          void **tunnel_ctx,
1876                          const struct GNUNET_PeerIdentity *sender,
1877                          const struct GNUNET_MessageHeader *message,
1878                          const struct GNUNET_ATS_Information*atsi)
1879 {
1880   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1881
1882   return GNUNET_OK;
1883 }
1884
1885
1886 /**
1887  * Message Handler for mesh
1888  *
1889  * @param socket the socket through which the ack was received
1890  * @param tunnel connection to the other end
1891  * @param sender who sent the message
1892  * @param ack the acknowledgment message
1893  * @param atsi performance data for the connection
1894  * @return GNUNET_OK to keep the connection open,
1895  *         GNUNET_SYSERR to close it (signal serious error)
1896  */
1897 static int
1898 handle_ack (struct GNUNET_STREAM_Socket *socket,
1899             struct GNUNET_MESH_Tunnel *tunnel,
1900             const struct GNUNET_PeerIdentity *sender,
1901             const struct GNUNET_STREAM_AckMessage *ack,
1902             const struct GNUNET_ATS_Information*atsi)
1903 {
1904   unsigned int packet;
1905   int need_retransmission;
1906   
1907
1908   if (GNUNET_PEER_search (sender) != socket->other_peer)
1909     {
1910       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1911                   "%x: Received ACK from non-confirming peer\n",
1912                   socket->our_id);
1913       return GNUNET_YES;
1914     }
1915
1916   switch (socket->state)
1917     {
1918     case (STATE_ESTABLISHED):
1919       if (NULL == socket->write_handle)
1920         {
1921           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1922                       "%x: Received DATA_ACK when write_handle is NULL\n",
1923                       socket->our_id);
1924           return GNUNET_OK;
1925         }
1926       /* FIXME: increment in the base sequence number is breaking current flow
1927        */
1928       if (!((socket->write_sequence_number 
1929              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1930         {
1931           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1932                       "%x: Received DATA_ACK with unexpected base sequence "
1933                       "number\n",
1934                       socket->our_id);
1935           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1936                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
1937                       socket->our_id,
1938                       socket->write_sequence_number,
1939                       ntohl (ack->base_sequence_number));
1940           return GNUNET_OK;
1941         }
1942       /* FIXME: include the case when write_handle is cancelled - ignore the 
1943          acks */
1944
1945       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1946                   "%x: Received DATA_ACK from %x\n",
1947                   socket->our_id,
1948                   socket->other_peer);
1949       
1950       /* Cancel the retransmission task */
1951       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1952         {
1953           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1954           socket->retransmission_timeout_task_id = 
1955             GNUNET_SCHEDULER_NO_TASK;
1956         }
1957
1958       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1959         {
1960           if (NULL == socket->write_handle->messages[packet]) break;
1961           if (ntohl (ack->base_sequence_number)
1962               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
1963             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1964                                   packet,
1965                                   GNUNET_YES);
1966           else
1967             if (GNUNET_YES == 
1968                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
1969                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
1970                                       - ntohl (ack->base_sequence_number)))
1971               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1972                                     packet,
1973                                     GNUNET_YES);
1974         }
1975
1976       /* Update the receive window remaining
1977        FIXME : Should update with the value from a data ack with greater
1978        sequence number */
1979       socket->receiver_window_available = 
1980         ntohl (ack->receive_window_remaining);
1981
1982       /* Check if we have received all acknowledgements */
1983       need_retransmission = GNUNET_NO;
1984       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1985         {
1986           if (NULL == socket->write_handle->messages[packet]) break;
1987           if (GNUNET_YES != ackbitmap_is_bit_set 
1988               (&socket->write_handle->ack_bitmap,packet))
1989             {
1990               need_retransmission = GNUNET_YES;
1991               break;
1992             }
1993         }
1994       if (GNUNET_YES == need_retransmission)
1995         {
1996           write_data (socket);
1997         }
1998       else      /* We have to call the write continuation callback now */
1999         {
2000           /* Free the packets */
2001           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2002             {
2003               GNUNET_free_non_null (socket->write_handle->messages[packet]);
2004             }
2005           if (NULL != socket->write_handle->write_cont)
2006             socket->write_handle->write_cont
2007               (socket->write_handle->write_cont_cls,
2008                socket->status,
2009                socket->write_handle->size);
2010           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2011                       "%x: Write completion callback completed\n",
2012                       socket->our_id);
2013           /* We are done with the write handle - Freeing it */
2014           GNUNET_free (socket->write_handle);
2015           socket->write_handle = NULL;
2016         }
2017       break;
2018     default:
2019       break;
2020     }
2021   return GNUNET_OK;
2022 }
2023
2024
2025 /**
2026  * Message Handler for mesh
2027  *
2028  * @param cls the 'struct GNUNET_STREAM_Socket'
2029  * @param tunnel connection to the other end
2030  * @param tunnel_ctx unused
2031  * @param sender who sent the message
2032  * @param message the actual message
2033  * @param atsi performance data for the connection
2034  * @return GNUNET_OK to keep the connection open,
2035  *         GNUNET_SYSERR to close it (signal serious error)
2036  */
2037 static int
2038 client_handle_ack (void *cls,
2039                    struct GNUNET_MESH_Tunnel *tunnel,
2040                    void **tunnel_ctx,
2041                    const struct GNUNET_PeerIdentity *sender,
2042                    const struct GNUNET_MessageHeader *message,
2043                    const struct GNUNET_ATS_Information*atsi)
2044 {
2045   struct GNUNET_STREAM_Socket *socket = cls;
2046   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2047  
2048   return handle_ack (socket, tunnel, sender, ack, atsi);
2049 }
2050
2051
2052 /**
2053  * Message Handler for mesh
2054  *
2055  * @param cls the server's listen socket
2056  * @param tunnel connection to the other end
2057  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2058  * @param sender who sent the message
2059  * @param message the actual message
2060  * @param atsi performance data for the connection
2061  * @return GNUNET_OK to keep the connection open,
2062  *         GNUNET_SYSERR to close it (signal serious error)
2063  */
2064 static int
2065 server_handle_ack (void *cls,
2066                    struct GNUNET_MESH_Tunnel *tunnel,
2067                    void **tunnel_ctx,
2068                    const struct GNUNET_PeerIdentity *sender,
2069                    const struct GNUNET_MessageHeader *message,
2070                    const struct GNUNET_ATS_Information*atsi)
2071 {
2072   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2073   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2074  
2075   return handle_ack (socket, tunnel, sender, ack, atsi);
2076 }
2077
2078
2079 /**
2080  * For client message handlers, the stream socket is in the
2081  * closure argument.
2082  */
2083 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2084   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2085   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2086    sizeof (struct GNUNET_STREAM_AckMessage) },
2087   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2088    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2089   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2090    sizeof (struct GNUNET_STREAM_MessageHeader)},
2091   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2092    sizeof (struct GNUNET_STREAM_MessageHeader)},
2093   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2094    sizeof (struct GNUNET_STREAM_MessageHeader)},
2095   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2096    sizeof (struct GNUNET_STREAM_MessageHeader)},
2097   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2098    sizeof (struct GNUNET_STREAM_MessageHeader)},
2099   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2100    sizeof (struct GNUNET_STREAM_MessageHeader)},
2101   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2102    sizeof (struct GNUNET_STREAM_MessageHeader)},
2103   {NULL, 0, 0}
2104 };
2105
2106
2107 /**
2108  * For server message handlers, the stream socket is in the
2109  * tunnel context, and the listen socket in the closure argument.
2110  */
2111 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2112   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2113   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2114    sizeof (struct GNUNET_STREAM_AckMessage) },
2115   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2116    sizeof (struct GNUNET_STREAM_MessageHeader)},
2117   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2118    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2119   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2120    sizeof (struct GNUNET_STREAM_MessageHeader)},
2121   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2122    sizeof (struct GNUNET_STREAM_MessageHeader)},
2123   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2124    sizeof (struct GNUNET_STREAM_MessageHeader)},
2125   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2126    sizeof (struct GNUNET_STREAM_MessageHeader)},
2127   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2128    sizeof (struct GNUNET_STREAM_MessageHeader)},
2129   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2130    sizeof (struct GNUNET_STREAM_MessageHeader)},
2131   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2132    sizeof (struct GNUNET_STREAM_MessageHeader)},
2133   {NULL, 0, 0}
2134 };
2135
2136
2137 /**
2138  * Function called when our target peer is connected to our tunnel
2139  *
2140  * @param cls the socket for which this tunnel is created
2141  * @param peer the peer identity of the target
2142  * @param atsi performance data for the connection
2143  */
2144 static void
2145 mesh_peer_connect_callback (void *cls,
2146                             const struct GNUNET_PeerIdentity *peer,
2147                             const struct GNUNET_ATS_Information * atsi)
2148 {
2149   struct GNUNET_STREAM_Socket *socket = cls;
2150   struct GNUNET_STREAM_MessageHeader *message;
2151   GNUNET_PEER_Id connected_peer;
2152
2153   connected_peer = GNUNET_PEER_search (peer);
2154   
2155   if (connected_peer != socket->other_peer)
2156     {
2157       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2158                   "%x: A peer which is not our target has connected",
2159                   "to our tunnel\n",
2160                   socket->our_id);
2161       return;
2162     }
2163   
2164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2165               "%x: Target peer %x connected\n", 
2166               socket->our_id,
2167               connected_peer);
2168   
2169   /* Set state to INIT */
2170   socket->state = STATE_INIT;
2171
2172   /* Send HELLO message */
2173   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2174   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2175   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2176   queue_message (socket,
2177                  message,
2178                  &set_state_hello_wait,
2179                  NULL);
2180
2181   /* Call open callback */
2182   if (NULL == socket->open_cb)
2183     {
2184       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2185                   "STREAM_open callback is NULL\n");
2186     }
2187 }
2188
2189
2190 /**
2191  * Function called when our target peer is disconnected from our tunnel
2192  *
2193  * @param cls the socket associated which this tunnel
2194  * @param peer the peer identity of the target
2195  */
2196 static void
2197 mesh_peer_disconnect_callback (void *cls,
2198                                const struct GNUNET_PeerIdentity *peer)
2199 {
2200
2201 }
2202
2203
2204 /**
2205  * Method called whenever a peer creates a tunnel to us
2206  *
2207  * @param cls closure
2208  * @param tunnel new handle to the tunnel
2209  * @param initiator peer that started the tunnel
2210  * @param atsi performance information for the tunnel
2211  * @return initial tunnel context for the tunnel
2212  *         (can be NULL -- that's not an error)
2213  */
2214 static void *
2215 new_tunnel_notify (void *cls,
2216                    struct GNUNET_MESH_Tunnel *tunnel,
2217                    const struct GNUNET_PeerIdentity *initiator,
2218                    const struct GNUNET_ATS_Information *atsi)
2219 {
2220   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2221   struct GNUNET_STREAM_Socket *socket;
2222
2223   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2224      from the same peer again until the socket is closed */
2225
2226   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2227   socket->other_peer = GNUNET_PEER_intern (initiator);
2228   socket->tunnel = tunnel;
2229   socket->session_id = 0;       /* FIXME */
2230   socket->state = STATE_INIT;
2231   socket->lsocket = lsocket;
2232   socket->our_id = lsocket->our_id;
2233   
2234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2235               "%x: Peer %x initiated tunnel to us\n", 
2236               socket->our_id,
2237               socket->other_peer);
2238   
2239   /* FIXME: Copy MESH handle from lsocket to socket */
2240   
2241   return socket;
2242 }
2243
2244
2245 /**
2246  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2247  * any associated state.  This function is NOT called if the client has
2248  * explicitly asked for the tunnel to be destroyed using
2249  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2250  * the tunnel.
2251  *
2252  * @param cls closure (set from GNUNET_MESH_connect)
2253  * @param tunnel connection to the other end (henceforth invalid)
2254  * @param tunnel_ctx place where local state associated
2255  *                   with the tunnel is stored
2256  */
2257 static void 
2258 tunnel_cleaner (void *cls,
2259                 const struct GNUNET_MESH_Tunnel *tunnel,
2260                 void *tunnel_ctx)
2261 {
2262   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2263   
2264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2265               "%x: Peer %x has terminated connection abruptly\n",
2266               socket->our_id,
2267               socket->other_peer);
2268
2269   socket->status = GNUNET_STREAM_SHUTDOWN;
2270
2271   /* Clear Transmit handles */
2272   if (NULL != socket->transmit_handle)
2273     {
2274       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2275       socket->transmit_handle = NULL;
2276     }
2277   socket->tunnel = NULL;
2278 }
2279
2280
2281 /*****************/
2282 /* API functions */
2283 /*****************/
2284
2285
2286 /**
2287  * Tries to open a stream to the target peer
2288  *
2289  * @param cfg configuration to use
2290  * @param target the target peer to which the stream has to be opened
2291  * @param app_port the application port number which uniquely identifies this
2292  *            stream
2293  * @param open_cb this function will be called after stream has be established 
2294  * @param open_cb_cls the closure for open_cb
2295  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2296  * @return if successful it returns the stream socket; NULL if stream cannot be
2297  *         opened 
2298  */
2299 struct GNUNET_STREAM_Socket *
2300 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2301                     const struct GNUNET_PeerIdentity *target,
2302                     GNUNET_MESH_ApplicationType app_port,
2303                     GNUNET_STREAM_OpenCallback open_cb,
2304                     void *open_cb_cls,
2305                     ...)
2306 {
2307   struct GNUNET_STREAM_Socket *socket;
2308   struct GNUNET_PeerIdentity own_peer_id;
2309   enum GNUNET_STREAM_Option option;
2310   va_list vargs;                /* Variable arguments */
2311
2312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2313               "%s\n", __func__);
2314
2315   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2316   socket->other_peer = GNUNET_PEER_intern (target);
2317   socket->open_cb = open_cb;
2318   socket->open_cls = open_cb_cls;
2319   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2320   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2321   
2322   /* Set defaults */
2323   socket->retransmit_timeout = 
2324     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2325
2326   va_start (vargs, open_cb_cls); /* Parse variable args */
2327   do {
2328     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2329     switch (option)
2330       {
2331       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2332         /* Expect struct GNUNET_TIME_Relative */
2333         socket->retransmit_timeout = va_arg (vargs,
2334                                              struct GNUNET_TIME_Relative);
2335         break;
2336       case GNUNET_STREAM_OPTION_END:
2337         break;
2338       }
2339   } while (GNUNET_STREAM_OPTION_END != option);
2340   va_end (vargs);               /* End of variable args parsing */
2341   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2342                                       10,  /* QUEUE size as parameter? */
2343                                       socket, /* cls */
2344                                       NULL, /* No inbound tunnel handler */
2345                                       &tunnel_cleaner, /* FIXME: not required? */
2346                                       client_message_handlers,
2347                                       &app_port); /* We don't get inbound tunnels */
2348   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2349     {
2350       GNUNET_free (socket);
2351       return NULL;
2352     }
2353
2354   /* Now create the mesh tunnel to target */
2355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2356               "Creating MESH Tunnel\n");
2357   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2358                                               NULL, /* Tunnel context */
2359                                               &mesh_peer_connect_callback,
2360                                               &mesh_peer_disconnect_callback,
2361                                               socket);
2362   GNUNET_assert (NULL != socket->tunnel);
2363   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2364                                         target);
2365   
2366   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2367               "%s() END\n", __func__);
2368   return socket;
2369 }
2370
2371
2372 /**
2373  * Shutdown the stream for reading or writing (man 2 shutdown).
2374  *
2375  * @param socket the stream socket
2376  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2377  * @return the shutdown handle
2378  */
2379 struct GNUNET_STREAM_ShutdownHandle *
2380 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2381                         int how)
2382 {
2383   return NULL;
2384 }
2385
2386
2387 /**
2388  * Cancels a pending shutdown
2389  *
2390  * @param the shutdown handle returned from GNUNET_STREAM_shutdown
2391  */
2392 void
2393 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2394 {
2395   return;
2396 }
2397
2398
2399 /**
2400  * Closes the stream
2401  *
2402  * @param socket the stream socket
2403  */
2404 void
2405 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2406 {
2407   struct MessageQueue *head;
2408
2409   GNUNET_break (NULL == socket->read_handle);
2410   GNUNET_break (NULL == socket->write_handle);
2411
2412   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2413     {
2414       /* socket closed with read task pending!? */
2415       GNUNET_break (0);
2416       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2417       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2418     }
2419   
2420   /* Terminate the ack'ing tasks if they are still present */
2421   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2422     {
2423       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2424       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2425     }
2426
2427   /* Clear Transmit handles */
2428   if (NULL != socket->transmit_handle)
2429     {
2430       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2431       socket->transmit_handle = NULL;
2432     }
2433
2434   /* Clear existing message queue */
2435   while (NULL != (head = socket->queue_head)) {
2436     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2437                                  socket->queue_tail,
2438                                  head);
2439     GNUNET_free (head->message);
2440     GNUNET_free (head);
2441   }
2442
2443   /* Close associated tunnel */
2444   if (NULL != socket->tunnel)
2445     {
2446       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2447       socket->tunnel = NULL;
2448     }
2449
2450   /* Close mesh connection */
2451   if (NULL != socket->mesh && NULL == socket->lsocket)
2452     {
2453       GNUNET_MESH_disconnect (socket->mesh);
2454       socket->mesh = NULL;
2455     }
2456   
2457   /* Release receive buffer */
2458   if (NULL != socket->receive_buffer)
2459     {
2460       GNUNET_free (socket->receive_buffer);
2461     }
2462
2463   GNUNET_free (socket);
2464 }
2465
2466
2467 /**
2468  * Listens for stream connections for a specific application ports
2469  *
2470  * @param cfg the configuration to use
2471  * @param app_port the application port for which new streams will be accepted
2472  * @param listen_cb this function will be called when a peer tries to establish
2473  *            a stream with us
2474  * @param listen_cb_cls closure for listen_cb
2475  * @return listen socket, NULL for any error
2476  */
2477 struct GNUNET_STREAM_ListenSocket *
2478 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2479                       GNUNET_MESH_ApplicationType app_port,
2480                       GNUNET_STREAM_ListenCallback listen_cb,
2481                       void *listen_cb_cls)
2482 {
2483   /* FIXME: Add variable args for passing configration options? */
2484   struct GNUNET_STREAM_ListenSocket *lsocket;
2485   struct GNUNET_PeerIdentity our_peer_id;
2486
2487   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2488   lsocket->port = app_port;
2489   lsocket->listen_cb = listen_cb;
2490   lsocket->listen_cb_cls = listen_cb_cls;
2491   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2492   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2493   lsocket->mesh = GNUNET_MESH_connect (cfg,
2494                                        10, /* FIXME: QUEUE size as parameter? */
2495                                        lsocket, /* Closure */
2496                                        &new_tunnel_notify,
2497                                        &tunnel_cleaner,
2498                                        server_message_handlers,
2499                                        &app_port);
2500   GNUNET_assert (NULL != lsocket->mesh);
2501   return lsocket;
2502 }
2503
2504
2505 /**
2506  * Closes the listen socket
2507  *
2508  * @param lsocket the listen socket
2509  */
2510 void
2511 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2512 {
2513   /* Close MESH connection */
2514   GNUNET_assert (NULL != lsocket->mesh);
2515   GNUNET_MESH_disconnect (lsocket->mesh);
2516   
2517   GNUNET_free (lsocket);
2518 }
2519
2520
2521 /**
2522  * Tries to write the given data to the stream
2523  *
2524  * @param socket the socket representing a stream
2525  * @param data the data buffer from where the data is written into the stream
2526  * @param size the number of bytes to be written from the data buffer
2527  * @param timeout the timeout period
2528  * @param write_cont the function to call upon writing some bytes into the stream
2529  * @param write_cont_cls the closure
2530  * @return handle to cancel the operation
2531  */
2532 struct GNUNET_STREAM_IOWriteHandle *
2533 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2534                      const void *data,
2535                      size_t size,
2536                      struct GNUNET_TIME_Relative timeout,
2537                      GNUNET_STREAM_CompletionContinuation write_cont,
2538                      void *write_cont_cls)
2539 {
2540   unsigned int num_needed_packets;
2541   unsigned int packet;
2542   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2543   uint32_t packet_size;
2544   uint32_t payload_size;
2545   struct GNUNET_STREAM_DataMessage *data_msg;
2546   const void *sweep;
2547   struct GNUNET_TIME_Relative ack_deadline;
2548
2549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2550               "%s\n", __func__);
2551
2552   /* Return NULL if there is already a write request pending */
2553   if (NULL != socket->write_handle)
2554   {
2555     GNUNET_break (0);
2556     return NULL;
2557   }
2558   if (!((STATE_ESTABLISHED == socket->state)
2559         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2560         || (STATE_RECEIVE_CLOSED == socket->state)))
2561     {
2562       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2563                   "%x: Attempting to write on a closed (OR) not-yet-established"
2564                   "stream\n",
2565                   socket->our_id);
2566       return NULL;
2567     } 
2568   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2569     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2570   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2571   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2572   io_handle->socket = socket;
2573   io_handle->write_cont = write_cont;
2574   io_handle->write_cont_cls = write_cont_cls;
2575   io_handle->size = size;
2576   sweep = data;
2577   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2578      determined from RTT */
2579   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2580   /* Divide the given buffer into packets for sending */
2581   for (packet=0; packet < num_needed_packets; packet++)
2582     {
2583       if ((packet + 1) * max_payload_size < size) 
2584         {
2585           payload_size = max_payload_size;
2586           packet_size = MAX_PACKET_SIZE;
2587         }
2588       else 
2589         {
2590           payload_size = size - packet * max_payload_size;
2591           packet_size =  payload_size + sizeof (struct
2592                                                 GNUNET_STREAM_DataMessage); 
2593         }
2594       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2595       io_handle->messages[packet]->header.header.size = htons (packet_size);
2596       io_handle->messages[packet]->header.header.type =
2597         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2598       io_handle->messages[packet]->sequence_number =
2599         htonl (socket->write_sequence_number++);
2600       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2601
2602       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2603          determined from RTT */
2604       io_handle->messages[packet]->ack_deadline =
2605         GNUNET_TIME_relative_hton (ack_deadline);
2606       data_msg = io_handle->messages[packet];
2607       /* Copy data from given buffer to the packet */
2608       memcpy (&data_msg[1],
2609               sweep,
2610               payload_size);
2611       sweep += payload_size;
2612       socket->write_offset += payload_size;
2613     }
2614   socket->write_handle = io_handle;
2615   write_data (socket);
2616
2617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2618               "%s() END\n", __func__);
2619
2620   return io_handle;
2621 }
2622
2623
2624 /**
2625  * Tries to read data from the stream
2626  *
2627  * @param socket the socket representing a stream
2628  * @param timeout the timeout period
2629  * @param proc function to call with data (once only)
2630  * @param proc_cls the closure for proc
2631  * @return handle to cancel the operation
2632  */
2633 struct GNUNET_STREAM_IOReadHandle *
2634 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2635                     struct GNUNET_TIME_Relative timeout,
2636                     GNUNET_STREAM_DataProcessor proc,
2637                     void *proc_cls)
2638 {
2639   struct GNUNET_STREAM_IOReadHandle *read_handle;
2640   
2641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2642               "%x: %s()\n", 
2643               socket->our_id,
2644               __func__);
2645
2646   /* Return NULL if there is already a read handle; the user has to cancel that
2647   first before continuing or has to wait until it is completed */
2648   if (NULL != socket->read_handle) return NULL;
2649
2650   GNUNET_assert (NULL != proc);
2651
2652   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2653   read_handle->proc = proc;
2654   read_handle->proc_cls = proc_cls;
2655   socket->read_handle = read_handle;
2656
2657   /* Check if we have a packet at bitmap 0 */
2658   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2659                                           0))
2660     {
2661       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2662                                                        socket);
2663    
2664     }
2665   
2666   /* Setup the read timeout task */
2667   socket->read_io_timeout_task_id =
2668     GNUNET_SCHEDULER_add_delayed (timeout,
2669                                   &read_io_timeout,
2670                                   socket);
2671   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2672               "%x: %s() END\n",
2673               socket->our_id,
2674               __func__);
2675   return read_handle;
2676 }
2677
2678
2679 /**
2680  * Cancel pending write operation.
2681  *
2682  * @param ioh handle to operation to cancel
2683  */
2684 void
2685 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2686 {
2687   struct GNUNET_STREAM_Socket *socket = ioh->socket;
2688   unsigned int packet;
2689
2690   GNUNET_assert (NULL != socket->write_handle);
2691   GNUNET_assert (socket->write_handle == ioh);
2692
2693   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2694     {
2695       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2696       socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2697     }
2698
2699   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2700     {
2701       if (NULL == ioh->messages[packet]) break;
2702       GNUNET_free (ioh->messages[packet]);
2703     }
2704       
2705   GNUNET_free (socket->write_handle);
2706   socket->write_handle = NULL;
2707   return;
2708 }
2709
2710
2711 /**
2712  * Cancel pending read operation.
2713  *
2714  * @param ioh handle to operation to cancel
2715  */
2716 void
2717 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2718 {
2719   return;
2720 }