- de-duplication
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * The listen socket from which this socket is derived. Should be NULL if it
235    * is not a derived socket
236    */
237   struct GNUNET_STREAM_ListenSocket *lsocket;
238
239   /**
240    * Task identifier for the read io timeout task
241    */
242   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
243
244   /**
245    * Task identifier for retransmission task after timeout
246    */
247   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
248
249   /**
250    * The task for sending timely Acks
251    */
252   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
253
254   /**
255    * Task scheduled to continue a read operation.
256    */
257   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
258
259   /**
260    * The state of the protocol associated with this socket
261    */
262   enum State state;
263
264   /**
265    * The status of the socket
266    */
267   enum GNUNET_STREAM_Status status;
268
269   /**
270    * The number of previous timeouts; FIXME: currently not used
271    */
272   unsigned int retries;
273
274   /**
275    * The peer identity of the peer at the other end of the stream
276    */
277   GNUNET_PEER_Id other_peer;
278
279   /**
280    * Our Peer Identity (for debugging)
281    */
282   GNUNET_PEER_Id our_id;
283
284   /**
285    * The application port number (type: uint32_t)
286    */
287   GNUNET_MESH_ApplicationType app_port;
288
289   /**
290    * The session id associated with this stream connection
291    * FIXME: Not used currently, may be removed
292    */
293   uint32_t session_id;
294
295   /**
296    * Write sequence number. Set to random when sending HELLO(client) and
297    * HELLO_ACK(server) 
298    */
299   uint32_t write_sequence_number;
300
301   /**
302    * Read sequence number. This number's value is determined during handshake
303    */
304   uint32_t read_sequence_number;
305
306   /**
307    * The receiver buffer size
308    */
309   uint32_t receive_buffer_size;
310
311   /**
312    * The receiver buffer boundaries
313    */
314   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
315
316   /**
317    * receiver's available buffer after the last acknowledged packet
318    */
319   uint32_t receiver_window_available;
320
321   /**
322    * The offset pointer used during write operation
323    */
324   uint32_t write_offset;
325
326   /**
327    * The offset after which we are expecting data
328    */
329   uint32_t read_offset;
330
331   /**
332    * The offset upto which user has read from the received buffer
333    */
334   uint32_t copy_offset;
335 };
336
337
338 /**
339  * A socket for listening
340  */
341 struct GNUNET_STREAM_ListenSocket
342 {
343   /**
344    * The mesh handle
345    */
346   struct GNUNET_MESH_Handle *mesh;
347
348   /**
349    * The callback function which is called after successful opening socket
350    */
351   GNUNET_STREAM_ListenCallback listen_cb;
352
353   /**
354    * The call back closure
355    */
356   void *listen_cb_cls;
357
358   /**
359    * Our interned Peer's identity
360    */
361   GNUNET_PEER_Id our_id;
362
363   /**
364    * The service port
365    * FIXME: Remove if not required!
366    */
367   GNUNET_MESH_ApplicationType port;
368 };
369
370
371 /**
372  * The IO Write Handle
373  */
374 struct GNUNET_STREAM_IOWriteHandle
375 {
376   /**
377    * The packet_buffers associated with this Handle
378    */
379   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
380
381   /**
382    * The write continuation callback
383    */
384   GNUNET_STREAM_CompletionContinuation write_cont;
385
386   /**
387    * Write continuation closure
388    */
389   void *write_cont_cls;
390
391   /**
392    * The bitmap of this IOHandle; Corresponding bit for a message is set when
393    * it has been acknowledged by the receiver
394    */
395   GNUNET_STREAM_AckBitmap ack_bitmap;
396
397   /**
398    * Number of bytes in this write handle
399    */
400   size_t size;
401
402   /**
403    * Number of packets sent before waiting for an ack
404    *
405    * FIXME: Do we need this?
406    */
407   unsigned int sent_packets;
408 };
409
410
411 /**
412  * The IO Read Handle
413  */
414 struct GNUNET_STREAM_IOReadHandle
415 {
416   /**
417    * Callback for the read processor
418    */
419   GNUNET_STREAM_DataProcessor proc;
420
421   /**
422    * The closure pointer for the read processor callback
423    */
424   void *proc_cls;
425 };
426
427
428 /**
429  * Default value in seconds for various timeouts
430  */
431 static unsigned int default_timeout = 10;
432
433 /**
434  * Callback function for sending queued message
435  *
436  * @param cls closure the socket
437  * @param size number of bytes available in buf
438  * @param buf where the callee should write the message
439  * @return number of bytes written to buf
440  */
441 static size_t
442 send_message_notify (void *cls, size_t size, void *buf)
443 {
444   struct GNUNET_STREAM_Socket *socket = cls;
445   struct GNUNET_PeerIdentity target;
446   struct MessageQueue *head;
447   size_t ret;
448
449   socket->transmit_handle = NULL; /* Remove the transmit handle */
450   head = socket->queue_head;
451   if (NULL == head)
452     return 0; /* just to be safe */
453   GNUNET_PEER_resolve (socket->other_peer, &target);
454   if (0 == size)                /* request timed out */
455     {
456       socket->retries++;
457       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458                   "Message sending timed out. Retry %d \n",
459                   socket->retries);
460       socket->transmit_handle = 
461         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
462                                            0, /* Corking */
463                                            1, /* Priority */
464                                            /* FIXME: exponential backoff */
465                                            socket->retransmit_timeout,
466                                            &target,
467                                            ntohs (head->message->header.size),
468                                            &send_message_notify,
469                                            socket);
470       return 0;
471     }
472
473   ret = ntohs (head->message->header.size);
474   GNUNET_assert (size >= ret);
475   memcpy (buf, head->message, ret);
476   if (NULL != head->finish_cb)
477     {
478       head->finish_cb (head->finish_cb_cls, socket);
479     }
480   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
481                                socket->queue_tail,
482                                head);
483   GNUNET_free (head->message);
484   GNUNET_free (head);
485   head = socket->queue_head;
486   if (NULL != head)    /* more pending messages to send */
487     {
488       socket->retries = 0;
489       socket->transmit_handle = 
490         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
491                                            0, /* Corking */
492                                            1, /* Priority */
493                                            /* FIXME: exponential backoff */
494                                            socket->retransmit_timeout,
495                                            &target,
496                                            ntohs (head->message->header.size),
497                                            &send_message_notify,
498                                            socket);
499     }
500   return ret;
501 }
502
503
504 /**
505  * Queues a message for sending using the mesh connection of a socket
506  *
507  * @param socket the socket whose mesh connection is used
508  * @param message the message to be sent
509  * @param finish_cb the callback to be called when the message is sent
510  * @param finish_cb_cls the closure for the callback
511  */
512 static void
513 queue_message (struct GNUNET_STREAM_Socket *socket,
514                struct GNUNET_STREAM_MessageHeader *message,
515                SendFinishCallback finish_cb,
516                void *finish_cb_cls)
517 {
518   struct MessageQueue *queue_entity;
519   struct GNUNET_PeerIdentity target;
520
521   GNUNET_assert 
522     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
523      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
524
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526               "%x: Queueing message of type %d and size %d\n",
527               socket->our_id,
528               ntohs (message->header.type),
529               ntohs (message->header.size));
530   GNUNET_assert (NULL != message);
531   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
532   queue_entity->message = message;
533   queue_entity->finish_cb = finish_cb;
534   queue_entity->finish_cb_cls = finish_cb_cls;
535   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
536                                     socket->queue_tail,
537                                     queue_entity);
538   if (NULL == socket->transmit_handle)
539   {
540     socket->retries = 0;
541     GNUNET_PEER_resolve (socket->other_peer, &target);
542     socket->transmit_handle = 
543       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
544                                          0, /* Corking */
545                                          1, /* Priority */
546                                          socket->retransmit_timeout,
547                                          &target,
548                                          ntohs (message->header.size),
549                                          &send_message_notify,
550                                          socket);
551   }
552 }
553
554
555 /**
556  * Copies a message and queues it for sending using the mesh connection of
557  * given socket 
558  *
559  * @param socket the socket whose mesh connection is used
560  * @param message the message to be sent
561  * @param finish_cb the callback to be called when the message is sent
562  * @param finish_cb_cls the closure for the callback
563  */
564 static void
565 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
566                         const struct GNUNET_STREAM_MessageHeader *message,
567                         SendFinishCallback finish_cb,
568                         void *finish_cb_cls)
569 {
570   struct GNUNET_STREAM_MessageHeader *msg_copy;
571   uint16_t size;
572   
573   size = ntohs (message->header.size);
574   msg_copy = GNUNET_malloc (size);
575   memcpy (msg_copy, message, size);
576   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
577 }
578
579
580 /**
581  * Callback function for sending ack message
582  *
583  * @param cls closure the ACK message created in ack_task
584  * @param size number of bytes available in buffer
585  * @param buf where the callee should write the message
586  * @return number of bytes written to buf
587  */
588 static size_t
589 send_ack_notify (void *cls, size_t size, void *buf)
590 {
591   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
592
593   if (0 == size)
594     {
595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596                   "%s called with size 0\n", __func__);
597       return 0;
598     }
599   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
600   
601   size = ntohs (ack_msg->header.header.size);
602   memcpy (buf, ack_msg, size);
603   return size;
604 }
605
606 /**
607  * Writes data using the given socket. The amount of data written is limited by
608  * the receiver_window_size
609  *
610  * @param socket the socket to use
611  */
612 static void 
613 write_data (struct GNUNET_STREAM_Socket *socket);
614
615 /**
616  * Task for retransmitting data messages if they aren't ACK before their ack
617  * deadline 
618  *
619  * @param cls the socket
620  * @param tc the Task context
621  */
622 static void
623 retransmission_timeout_task (void *cls,
624                              const struct GNUNET_SCHEDULER_TaskContext *tc)
625 {
626   struct GNUNET_STREAM_Socket *socket = cls;
627   
628   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
629     return;
630
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "%x: Retransmitting DATA...\n", socket->our_id);
633   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
634   write_data (socket);
635 }
636
637
638 /**
639  * Task for sending ACK message
640  *
641  * @param cls the socket
642  * @param tc the Task context
643  */
644 static void
645 ack_task (void *cls,
646           const struct GNUNET_SCHEDULER_TaskContext *tc)
647 {
648   struct GNUNET_STREAM_Socket *socket = cls;
649   struct GNUNET_STREAM_AckMessage *ack_msg;
650   struct GNUNET_PeerIdentity target;
651
652   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
653     {
654       return;
655     }
656
657   socket->ack_task_id = 0;
658
659   /* Create the ACK Message */
660   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
661   ack_msg->header.header.size = htons (sizeof (struct 
662                                                GNUNET_STREAM_AckMessage));
663   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
664   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
665   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
666   ack_msg->receive_window_remaining = 
667     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
668
669   GNUNET_PEER_resolve (socket->other_peer, &target);
670   /* Request MESH for sending ACK */
671   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
672                                      0, /* Corking */
673                                      1, /* Priority */
674                                      socket->retransmit_timeout,
675                                      &target,
676                                      ntohs (ack_msg->header.header.size),
677                                      &send_ack_notify,
678                                      ack_msg);
679
680   
681 }
682
683
684 /**
685  * Function to modify a bit in GNUNET_STREAM_AckBitmap
686  *
687  * @param bitmap the bitmap to modify
688  * @param bit the bit number to modify
689  * @param value GNUNET_YES to on, GNUNET_NO to off
690  */
691 static void
692 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
693                       unsigned int bit, 
694                       int value)
695 {
696   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
697   if (GNUNET_YES == value)
698     *bitmap |= (1LL << bit);
699   else
700     *bitmap &= ~(1LL << bit);
701 }
702
703
704 /**
705  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
706  *
707  * @param bitmap address of the bitmap that has to be checked
708  * @param bit the bit number to check
709  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
710  */
711 static uint8_t
712 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
713                       unsigned int bit)
714 {
715   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
716   return 0 != (*bitmap & (1LL << bit));
717 }
718
719
720
721 /**
722  * Function called when Data Message is sent
723  *
724  * @param cls the io_handle corresponding to the Data Message
725  * @param socket the socket which was used
726  */
727 static void
728 write_data_finish_cb (void *cls,
729                       struct GNUNET_STREAM_Socket *socket)
730 {
731   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
732
733   io_handle->sent_packets++;
734 }
735
736
737 /**
738  * Writes data using the given socket. The amount of data written is limited by
739  * the receiver_window_size
740  *
741  * @param socket the socket to use
742  */
743 static void 
744 write_data (struct GNUNET_STREAM_Socket *socket)
745 {
746   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
747   int packet;                   /* Although an int, should never be negative */
748   int ack_packet;
749
750   ack_packet = -1;
751   /* Find the last acknowledged packet */
752   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
753     {      
754       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
755                                               packet))
756         ack_packet = packet;        
757       else if (NULL == io_handle->messages[packet])
758         break;
759     }
760   /* Resend packets which weren't ack'ed */
761   for (packet=0; packet < ack_packet; packet++)
762     {
763       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
764                                              packet))
765         {
766           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767                       "%x: Placing DATA message with sequence %u in send queue\n",
768                       socket->our_id,
769                       ntohl (io_handle->messages[packet]->sequence_number));
770
771           copy_and_queue_message (socket,
772                                   &io_handle->messages[packet]->header,
773                                   NULL,
774                                   NULL);
775         }
776     }
777   packet = ack_packet + 1;
778   /* Now send new packets if there is enough buffer space */
779   while ( (NULL != io_handle->messages[packet]) &&
780           (socket->receiver_window_available 
781            >= ntohs (io_handle->messages[packet]->header.header.size)) )
782     {
783       socket->receiver_window_available -= 
784         ntohs (io_handle->messages[packet]->header.header.size);
785       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786                   "%x: Placing DATA message with sequence %u in send queue\n",
787                   socket->our_id,
788                   ntohl (io_handle->messages[packet]->sequence_number));
789       copy_and_queue_message (socket,
790                               &io_handle->messages[packet]->header,
791                               &write_data_finish_cb,
792                               io_handle);
793       packet++;
794     }
795
796   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
797     socket->retransmission_timeout_task_id = 
798       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
799                                     (GNUNET_TIME_UNIT_SECONDS, 5),
800                                     &retransmission_timeout_task,
801                                     socket);
802 }
803
804
805 /**
806  * Task for calling the read processor
807  *
808  * @param cls the socket
809  * @param tc the task context
810  */
811 static void
812 call_read_processor (void *cls,
813                           const struct GNUNET_SCHEDULER_TaskContext *tc)
814 {
815   struct GNUNET_STREAM_Socket *socket = cls;
816   size_t read_size;
817   size_t valid_read_size;
818   unsigned int packet;
819   uint32_t sequence_increase;
820   uint32_t offset_increase;
821
822   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
823   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
824     return;
825
826   GNUNET_assert (NULL != socket->read_handle);
827   GNUNET_assert (NULL != socket->read_handle->proc);
828
829   /* Check the bitmap for any holes */
830   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
831     {
832       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
833                                              packet))
834         break;
835     }
836   /* We only call read processor if we have the first packet */
837   GNUNET_assert (0 < packet);
838
839   valid_read_size = 
840     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
841
842   GNUNET_assert (0 != valid_read_size);
843
844   /* Cancel the read_io_timeout_task */
845   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
846   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
847
848   /* Call the data processor */
849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850               "%x: Calling read processor\n",
851               socket->our_id);
852   read_size = 
853     socket->read_handle->proc (socket->read_handle->proc_cls,
854                                socket->status,
855                                socket->receive_buffer + socket->copy_offset,
856                                valid_read_size);
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "%x: Read processor read %d bytes\n",
859               socket->our_id,
860               read_size);
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "%x: Read processor completed successfully\n",
863               socket->our_id);
864
865   /* Free the read handle */
866   GNUNET_free (socket->read_handle);
867   socket->read_handle = NULL;
868
869   GNUNET_assert (read_size <= valid_read_size);
870   socket->copy_offset += read_size;
871
872   /* Determine upto which packet we can remove from the buffer */
873   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
874     {
875       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
876         { packet++; break; }
877       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
878         break;
879     }
880
881   /* If no packets can be removed we can't move the buffer */
882   if (0 == packet) return;
883
884   sequence_increase = packet;
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "%x: Sequence increase after read processor completion: %u\n",
887               socket->our_id,
888               sequence_increase);
889
890   /* Shift the data in the receive buffer */
891   memmove (socket->receive_buffer,
892            socket->receive_buffer 
893            + socket->receive_buffer_boundaries[sequence_increase-1],
894            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
895   
896   /* Shift the bitmap */
897   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
898   
899   /* Set read_sequence_number */
900   socket->read_sequence_number += sequence_increase;
901   
902   /* Set read_offset */
903   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
904   socket->read_offset += offset_increase;
905
906   /* Fix copy_offset */
907   GNUNET_assert (offset_increase <= socket->copy_offset);
908   socket->copy_offset -= offset_increase;
909   
910   /* Fix relative boundaries */
911   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
912     {
913       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
914         {
915           socket->receive_buffer_boundaries[packet] = 
916             socket->receive_buffer_boundaries[packet + sequence_increase] 
917             - offset_increase;
918         }
919       else
920         socket->receive_buffer_boundaries[packet] = 0;
921     }
922 }
923
924
925 /**
926  * Cancels the existing read io handle
927  *
928  * @param cls the closure from the SCHEDULER call
929  * @param tc the task context
930  */
931 static void
932 read_io_timeout (void *cls, 
933                 const struct GNUNET_SCHEDULER_TaskContext *tc)
934 {
935   struct GNUNET_STREAM_Socket *socket = cls;
936   GNUNET_STREAM_DataProcessor proc;
937   void *proc_cls;
938
939   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
940   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
941   {
942     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943                 "%x: Read task timedout - Cancelling it\n",
944                 socket->our_id);
945     GNUNET_SCHEDULER_cancel (socket->read_task_id);
946     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
947   }
948   GNUNET_assert (NULL != socket->read_handle);
949   proc = socket->read_handle->proc;
950   proc_cls = socket->read_handle->proc_cls;
951
952   GNUNET_free (socket->read_handle);
953   socket->read_handle = NULL;
954   /* Call the read processor to signal timeout */
955   proc (proc_cls,
956         GNUNET_STREAM_TIMEOUT,
957         NULL,
958         0);
959 }
960
961
962 /**
963  * Handler for DATA messages; Same for both client and server
964  *
965  * @param socket the socket through which the ack was received
966  * @param tunnel connection to the other end
967  * @param sender who sent the message
968  * @param msg the data message
969  * @param atsi performance data for the connection
970  * @return GNUNET_OK to keep the connection open,
971  *         GNUNET_SYSERR to close it (signal serious error)
972  */
973 static int
974 handle_data (struct GNUNET_STREAM_Socket *socket,
975              struct GNUNET_MESH_Tunnel *tunnel,
976              const struct GNUNET_PeerIdentity *sender,
977              const struct GNUNET_STREAM_DataMessage *msg,
978              const struct GNUNET_ATS_Information*atsi)
979 {
980   const void *payload;
981   uint32_t bytes_needed;
982   uint32_t relative_offset;
983   uint32_t relative_sequence_number;
984   uint16_t size;
985
986   size = htons (msg->header.header.size);
987   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
988     {
989       GNUNET_break_op (0);
990       return GNUNET_SYSERR;
991     }
992
993   if (GNUNET_PEER_search (sender) != socket->other_peer)
994     {
995       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996                   "%x: Received DATA from non-confirming peer\n",
997                   socket->our_id);
998       return GNUNET_YES;
999     }
1000
1001   switch (socket->state)
1002     {
1003     case STATE_ESTABLISHED:
1004     case STATE_TRANSMIT_CLOSED:
1005     case STATE_TRANSMIT_CLOSE_WAIT:
1006       
1007       /* check if the message's sequence number is in the range we are
1008          expecting */
1009       relative_sequence_number = 
1010         ntohl (msg->sequence_number) - socket->read_sequence_number;
1011       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1012         {
1013           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014                       "%x: Ignoring received message with sequence number %u\n",
1015                       socket->our_id,
1016                       ntohl (msg->sequence_number));
1017           /* Start ACK sending task if one is not already present */
1018           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1019             {
1020               socket->ack_task_id = 
1021                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1022                                               (msg->ack_deadline),
1023                                               &ack_task,
1024                                               socket);
1025             }
1026           return GNUNET_YES;
1027         }
1028       
1029       /* Check if we have already seen this message */
1030       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1031                                               relative_sequence_number))
1032         {
1033           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034                       "%x: Ignoring already received message with sequence "
1035                       "number %u\n",
1036                       socket->our_id,
1037                       ntohl (msg->sequence_number));
1038           /* Start ACK sending task if one is not already present */
1039           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1040             {
1041               socket->ack_task_id = 
1042                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1043                                               (msg->ack_deadline),
1044                                               &ack_task,
1045                                               socket);
1046             }
1047           return GNUNET_YES;
1048         }
1049
1050       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1051                   "%x: Receiving DATA with sequence number: %u and size: %d "
1052                   "from %x\n",
1053                   socket->our_id,
1054                   ntohl (msg->sequence_number),
1055                   ntohs (msg->header.header.size),
1056                   socket->other_peer);
1057
1058       /* Check if we have to allocate the buffer */
1059       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1060       relative_offset = ntohl (msg->offset) - socket->read_offset;
1061       bytes_needed = relative_offset + size;
1062       if (bytes_needed > socket->receive_buffer_size)
1063         {
1064           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1065             {
1066               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1067                                                        bytes_needed);
1068               socket->receive_buffer_size = bytes_needed;
1069             }
1070           else
1071             {
1072               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073                           "%x: Cannot accommodate packet %d as buffer is",
1074                           "full\n",
1075                           socket->our_id,
1076                           ntohl (msg->sequence_number));
1077               return GNUNET_YES;
1078             }
1079         }
1080       
1081       /* Copy Data to buffer */
1082       payload = &msg[1];
1083       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1084       memcpy (socket->receive_buffer + relative_offset,
1085               payload,
1086               size);
1087       socket->receive_buffer_boundaries[relative_sequence_number] = 
1088         relative_offset + size;
1089       
1090       /* Modify the ACK bitmap */
1091       ackbitmap_modify_bit (&socket->ack_bitmap,
1092                             relative_sequence_number,
1093                             GNUNET_YES);
1094
1095       /* Start ACK sending task if one is not already present */
1096       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1097        {
1098          socket->ack_task_id = 
1099            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1100                                          (msg->ack_deadline),
1101                                          &ack_task,
1102                                          socket);
1103        }
1104
1105       if ((NULL != socket->read_handle) /* A read handle is waiting */
1106           /* There is no current read task */
1107           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1108           /* We have the first packet */
1109           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1110                                                  0)))
1111         {
1112           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                       "%x: Scheduling read processor\n",
1114                       socket->our_id);
1115
1116           socket->read_task_id = 
1117             GNUNET_SCHEDULER_add_now (&call_read_processor,
1118                                       socket);
1119         }
1120       
1121       break;
1122
1123     default:
1124       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125                   "%x: Received data message when it cannot be handled\n",
1126                   socket->our_id);
1127       break;
1128     }
1129   return GNUNET_YES;
1130 }
1131
1132 /**
1133  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1134  *
1135  * @param cls the socket (set from GNUNET_MESH_connect)
1136  * @param tunnel connection to the other end
1137  * @param tunnel_ctx place to store local state associated with the tunnel
1138  * @param sender who sent the message
1139  * @param message the actual message
1140  * @param atsi performance data for the connection
1141  * @return GNUNET_OK to keep the connection open,
1142  *         GNUNET_SYSERR to close it (signal serious error)
1143  */
1144 static int
1145 client_handle_data (void *cls,
1146              struct GNUNET_MESH_Tunnel *tunnel,
1147              void **tunnel_ctx,
1148              const struct GNUNET_PeerIdentity *sender,
1149              const struct GNUNET_MessageHeader *message,
1150              const struct GNUNET_ATS_Information*atsi)
1151 {
1152   struct GNUNET_STREAM_Socket *socket = cls;
1153
1154   return handle_data (socket, 
1155                       tunnel, 
1156                       sender, 
1157                       (const struct GNUNET_STREAM_DataMessage *) message, 
1158                       atsi);
1159 }
1160
1161
1162 /**
1163  * Callback to set state to ESTABLISHED
1164  *
1165  * @param cls the closure from queue_message FIXME: document
1166  * @param socket the socket to requiring state change
1167  */
1168 static void
1169 set_state_established (void *cls,
1170                        struct GNUNET_STREAM_Socket *socket)
1171 {
1172   struct GNUNET_PeerIdentity initiator_pid;
1173
1174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1175               "%x: Attaining ESTABLISHED state\n",
1176               socket->our_id);
1177   socket->write_offset = 0;
1178   socket->read_offset = 0;
1179   socket->state = STATE_ESTABLISHED;
1180   /* FIXME: What if listen_cb is NULL */
1181   if (NULL != socket->lsocket)
1182     {
1183       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1184       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185                   "%x: Calling listen callback\n",
1186                   socket->our_id);
1187       if (GNUNET_SYSERR == 
1188           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1189                                       socket,
1190                                       &initiator_pid))
1191         {
1192           socket->state = STATE_CLOSED;
1193           /* FIXME: We should close in a decent way */
1194           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1195           GNUNET_free (socket);
1196         }
1197     }
1198   else if (socket->open_cb)
1199     socket->open_cb (socket->open_cls, socket);
1200 }
1201
1202
1203 /**
1204  * Callback to set state to HELLO_WAIT
1205  *
1206  * @param cls the closure from queue_message
1207  * @param socket the socket to requiring state change
1208  */
1209 static void
1210 set_state_hello_wait (void *cls,
1211                       struct GNUNET_STREAM_Socket *socket)
1212 {
1213   GNUNET_assert (STATE_INIT == socket->state);
1214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1215               "%x: Attaining HELLO_WAIT state\n",
1216               socket->our_id);
1217   socket->state = STATE_HELLO_WAIT;
1218 }
1219
1220
1221 /**
1222  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1223  * socket
1224  *
1225  * @param socket the socket for which this HelloAckMessage has to be generated
1226  * @return the HelloAckMessage
1227  */
1228 static struct GNUNET_STREAM_HelloAckMessage *
1229 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1230 {
1231   struct GNUNET_STREAM_HelloAckMessage *msg;
1232
1233   /* Get the random sequence number */
1234   socket->write_sequence_number = 
1235     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1237               "%x: Generated write sequence number %u\n",
1238               socket->our_id,
1239               (unsigned int) socket->write_sequence_number);
1240   
1241   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1242   msg->header.header.size = 
1243     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1244   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1245   msg->sequence_number = htonl (socket->write_sequence_number);
1246   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1247
1248   return msg;
1249 }
1250
1251
1252 /**
1253  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1254  *
1255  * @param cls the socket (set from GNUNET_MESH_connect)
1256  * @param tunnel connection to the other end
1257  * @param tunnel_ctx this is NULL
1258  * @param sender who sent the message
1259  * @param message the actual message
1260  * @param atsi performance data for the connection
1261  * @return GNUNET_OK to keep the connection open,
1262  *         GNUNET_SYSERR to close it (signal serious error)
1263  */
1264 static int
1265 client_handle_hello_ack (void *cls,
1266                          struct GNUNET_MESH_Tunnel *tunnel,
1267                          void **tunnel_ctx,
1268                          const struct GNUNET_PeerIdentity *sender,
1269                          const struct GNUNET_MessageHeader *message,
1270                          const struct GNUNET_ATS_Information*atsi)
1271 {
1272   struct GNUNET_STREAM_Socket *socket = cls;
1273   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1274   struct GNUNET_STREAM_HelloAckMessage *reply;
1275
1276   if (GNUNET_PEER_search (sender) != socket->other_peer)
1277     {
1278       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1279                   "%x: Received HELLO_ACK from non-confirming peer\n",
1280                   socket->our_id);
1281       return GNUNET_YES;
1282     }
1283   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1285               "%x: Received HELLO_ACK from %x\n",
1286               socket->our_id,
1287               socket->other_peer);
1288
1289   GNUNET_assert (socket->tunnel == tunnel);
1290   switch (socket->state)
1291   {
1292   case STATE_HELLO_WAIT:
1293     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1294     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295                 "%x: Read sequence number %u\n",
1296                 socket->our_id,
1297                 (unsigned int) socket->read_sequence_number);
1298     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1299     reply = generate_hello_ack_msg (socket);
1300     queue_message (socket,
1301                    &reply->header, 
1302                    &set_state_established, 
1303                    NULL);      
1304     return GNUNET_OK;
1305   case STATE_ESTABLISHED:
1306   case STATE_RECEIVE_CLOSE_WAIT:
1307     // call statistics (# ACKs ignored++)
1308     return GNUNET_OK;
1309   case STATE_INIT:
1310   default:
1311     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1312                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1313                 socket->our_id,
1314                 socket->other_peer,
1315                 socket->state);
1316     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1317     return GNUNET_SYSERR;
1318   }
1319
1320 }
1321
1322
1323 /**
1324  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1325  *
1326  * @param cls the socket (set from GNUNET_MESH_connect)
1327  * @param tunnel connection to the other end
1328  * @param tunnel_ctx this is NULL
1329  * @param sender who sent the message
1330  * @param message the actual message
1331  * @param atsi performance data for the connection
1332  * @return GNUNET_OK to keep the connection open,
1333  *         GNUNET_SYSERR to close it (signal serious error)
1334  */
1335 static int
1336 client_handle_reset (void *cls,
1337                      struct GNUNET_MESH_Tunnel *tunnel,
1338                      void **tunnel_ctx,
1339                      const struct GNUNET_PeerIdentity *sender,
1340                      const struct GNUNET_MessageHeader *message,
1341                      const struct GNUNET_ATS_Information*atsi)
1342 {
1343   struct GNUNET_STREAM_Socket *socket = cls;
1344
1345   return GNUNET_OK;
1346 }
1347
1348
1349 /**
1350  * Common message handler for handling TRANSMIT_CLOSE messages
1351  *
1352  * @param socket the socket through which the ack was received
1353  * @param tunnel connection to the other end
1354  * @param sender who sent the message
1355  * @param msg the transmit close message
1356  * @param atsi performance data for the connection
1357  * @return GNUNET_OK to keep the connection open,
1358  *         GNUNET_SYSERR to close it (signal serious error)
1359  */
1360 static int
1361 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1362                        struct GNUNET_MESH_Tunnel *tunnel,
1363                        const struct GNUNET_PeerIdentity *sender,
1364                        const struct GNUNET_STREAM_MessageHeader *msg,
1365                        const struct GNUNET_ATS_Information*atsi)
1366 {
1367   struct GNUNET_STREAM_MessageHeader *reply;
1368
1369   switch (socket->state)
1370     {
1371     case STATE_ESTABLISHED:
1372       socket->state = STATE_RECEIVE_CLOSED;
1373
1374       /* Send TRANSMIT_CLOSE_ACK */
1375       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1376       reply->header.type = 
1377         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1378       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1379       queue_message (socket, reply, NULL, NULL);
1380       break;
1381
1382     default:
1383       /* FIXME: Call statistics? */
1384       break;
1385     }
1386   return GNUNET_YES;
1387 }
1388
1389
1390 /**
1391  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1392  *
1393  * @param cls the socket (set from GNUNET_MESH_connect)
1394  * @param tunnel connection to the other end
1395  * @param tunnel_ctx this is NULL
1396  * @param sender who sent the message
1397  * @param message the actual message
1398  * @param atsi performance data for the connection
1399  * @return GNUNET_OK to keep the connection open,
1400  *         GNUNET_SYSERR to close it (signal serious error)
1401  */
1402 static int
1403 client_handle_transmit_close (void *cls,
1404                               struct GNUNET_MESH_Tunnel *tunnel,
1405                               void **tunnel_ctx,
1406                               const struct GNUNET_PeerIdentity *sender,
1407                               const struct GNUNET_MessageHeader *message,
1408                               const struct GNUNET_ATS_Information*atsi)
1409 {
1410   struct GNUNET_STREAM_Socket *socket = cls;
1411   
1412   return handle_transmit_close (socket,
1413                                 tunnel,
1414                                 sender,
1415                                 (struct GNUNET_STREAM_MessageHeader *)message,
1416                                 atsi);
1417 }
1418
1419
1420 /**
1421  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1422  *
1423  * @param cls the socket (set from GNUNET_MESH_connect)
1424  * @param tunnel connection to the other end
1425  * @param tunnel_ctx this is NULL
1426  * @param sender who sent the message
1427  * @param message the actual message
1428  * @param atsi performance data for the connection
1429  * @return GNUNET_OK to keep the connection open,
1430  *         GNUNET_SYSERR to close it (signal serious error)
1431  */
1432 static int
1433 client_handle_transmit_close_ack (void *cls,
1434                                   struct GNUNET_MESH_Tunnel *tunnel,
1435                                   void **tunnel_ctx,
1436                                   const struct GNUNET_PeerIdentity *sender,
1437                                   const struct GNUNET_MessageHeader *message,
1438                                   const struct GNUNET_ATS_Information*atsi)
1439 {
1440   struct GNUNET_STREAM_Socket *socket = cls;
1441
1442   return GNUNET_OK;
1443 }
1444
1445
1446 /**
1447  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1448  *
1449  * @param cls the socket (set from GNUNET_MESH_connect)
1450  * @param tunnel connection to the other end
1451  * @param tunnel_ctx this is NULL
1452  * @param sender who sent the message
1453  * @param message the actual message
1454  * @param atsi performance data for the connection
1455  * @return GNUNET_OK to keep the connection open,
1456  *         GNUNET_SYSERR to close it (signal serious error)
1457  */
1458 static int
1459 client_handle_receive_close (void *cls,
1460                              struct GNUNET_MESH_Tunnel *tunnel,
1461                              void **tunnel_ctx,
1462                              const struct GNUNET_PeerIdentity *sender,
1463                              const struct GNUNET_MessageHeader *message,
1464                              const struct GNUNET_ATS_Information*atsi)
1465 {
1466   struct GNUNET_STREAM_Socket *socket = cls;
1467
1468   return GNUNET_OK;
1469 }
1470
1471
1472 /**
1473  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1474  *
1475  * @param cls the socket (set from GNUNET_MESH_connect)
1476  * @param tunnel connection to the other end
1477  * @param tunnel_ctx this is NULL
1478  * @param sender who sent the message
1479  * @param message the actual message
1480  * @param atsi performance data for the connection
1481  * @return GNUNET_OK to keep the connection open,
1482  *         GNUNET_SYSERR to close it (signal serious error)
1483  */
1484 static int
1485 client_handle_receive_close_ack (void *cls,
1486                                  struct GNUNET_MESH_Tunnel *tunnel,
1487                                  void **tunnel_ctx,
1488                                  const struct GNUNET_PeerIdentity *sender,
1489                                  const struct GNUNET_MessageHeader *message,
1490                                  const struct GNUNET_ATS_Information*atsi)
1491 {
1492   struct GNUNET_STREAM_Socket *socket = cls;
1493
1494   return GNUNET_OK;
1495 }
1496
1497
1498 /**
1499  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1500  *
1501  * @param cls the socket (set from GNUNET_MESH_connect)
1502  * @param tunnel connection to the other end
1503  * @param tunnel_ctx this is NULL
1504  * @param sender who sent the message
1505  * @param message the actual message
1506  * @param atsi performance data for the connection
1507  * @return GNUNET_OK to keep the connection open,
1508  *         GNUNET_SYSERR to close it (signal serious error)
1509  */
1510 static int
1511 client_handle_close (void *cls,
1512                      struct GNUNET_MESH_Tunnel *tunnel,
1513                      void **tunnel_ctx,
1514                      const struct GNUNET_PeerIdentity *sender,
1515                      const struct GNUNET_MessageHeader *message,
1516                      const struct GNUNET_ATS_Information*atsi)
1517 {
1518   struct GNUNET_STREAM_Socket *socket = cls;
1519
1520   return GNUNET_OK;
1521 }
1522
1523
1524 /**
1525  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1526  *
1527  * @param cls the socket (set from GNUNET_MESH_connect)
1528  * @param tunnel connection to the other end
1529  * @param tunnel_ctx this is NULL
1530  * @param sender who sent the message
1531  * @param message the actual message
1532  * @param atsi performance data for the connection
1533  * @return GNUNET_OK to keep the connection open,
1534  *         GNUNET_SYSERR to close it (signal serious error)
1535  */
1536 static int
1537 client_handle_close_ack (void *cls,
1538                          struct GNUNET_MESH_Tunnel *tunnel,
1539                          void **tunnel_ctx,
1540                          const struct GNUNET_PeerIdentity *sender,
1541                          const struct GNUNET_MessageHeader *message,
1542                          const struct GNUNET_ATS_Information*atsi)
1543 {
1544   struct GNUNET_STREAM_Socket *socket = cls;
1545
1546   return GNUNET_OK;
1547 }
1548
1549 /*****************************/
1550 /* Server's Message Handlers */
1551 /*****************************/
1552
1553 /**
1554  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1555  *
1556  * @param cls the closure
1557  * @param tunnel connection to the other end
1558  * @param tunnel_ctx the socket
1559  * @param sender who sent the message
1560  * @param message the actual message
1561  * @param atsi performance data for the connection
1562  * @return GNUNET_OK to keep the connection open,
1563  *         GNUNET_SYSERR to close it (signal serious error)
1564  */
1565 static int
1566 server_handle_data (void *cls,
1567                     struct GNUNET_MESH_Tunnel *tunnel,
1568                     void **tunnel_ctx,
1569                     const struct GNUNET_PeerIdentity *sender,
1570                     const struct GNUNET_MessageHeader *message,
1571                     const struct GNUNET_ATS_Information*atsi)
1572 {
1573   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1574
1575   return handle_data (socket,
1576                       tunnel,
1577                       sender,
1578                       (const struct GNUNET_STREAM_DataMessage *)message,
1579                       atsi);
1580 }
1581
1582
1583 /**
1584  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1585  *
1586  * @param cls the closure
1587  * @param tunnel connection to the other end
1588  * @param tunnel_ctx the socket
1589  * @param sender who sent the message
1590  * @param message the actual message
1591  * @param atsi performance data for the connection
1592  * @return GNUNET_OK to keep the connection open,
1593  *         GNUNET_SYSERR to close it (signal serious error)
1594  */
1595 static int
1596 server_handle_hello (void *cls,
1597                      struct GNUNET_MESH_Tunnel *tunnel,
1598                      void **tunnel_ctx,
1599                      const struct GNUNET_PeerIdentity *sender,
1600                      const struct GNUNET_MessageHeader *message,
1601                      const struct GNUNET_ATS_Information*atsi)
1602 {
1603   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1604   struct GNUNET_STREAM_HelloAckMessage *reply;
1605
1606   if (GNUNET_PEER_search (sender) != socket->other_peer)
1607     {
1608       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1609                   "%x: Received HELLO from non-confirming peer\n",
1610                   socket->our_id);
1611       return GNUNET_YES;
1612     }
1613
1614   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1615                  ntohs (message->type));
1616   GNUNET_assert (socket->tunnel == tunnel);
1617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618               "%x: Received HELLO from %x\n", 
1619               socket->our_id,
1620               socket->other_peer);
1621
1622   if (STATE_INIT == socket->state)
1623     {
1624       reply = generate_hello_ack_msg (socket);
1625       queue_message (socket, 
1626                      &reply->header,
1627                      &set_state_hello_wait, 
1628                      NULL);
1629     }
1630   else
1631     {
1632       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1633                   "Client sent HELLO when in state %d\n", socket->state);
1634       /* FIXME: Send RESET? */
1635       
1636     }
1637   return GNUNET_OK;
1638 }
1639
1640
1641 /**
1642  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1643  *
1644  * @param cls the closure
1645  * @param tunnel connection to the other end
1646  * @param tunnel_ctx the socket
1647  * @param sender who sent the message
1648  * @param message the actual message
1649  * @param atsi performance data for the connection
1650  * @return GNUNET_OK to keep the connection open,
1651  *         GNUNET_SYSERR to close it (signal serious error)
1652  */
1653 static int
1654 server_handle_hello_ack (void *cls,
1655                          struct GNUNET_MESH_Tunnel *tunnel,
1656                          void **tunnel_ctx,
1657                          const struct GNUNET_PeerIdentity *sender,
1658                          const struct GNUNET_MessageHeader *message,
1659                          const struct GNUNET_ATS_Information*atsi)
1660 {
1661   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1662   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1663
1664   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1665                  ntohs (message->type));
1666   GNUNET_assert (socket->tunnel == tunnel);
1667   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1668   if (STATE_HELLO_WAIT == socket->state)
1669     {
1670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1671                   "%x: Received HELLO_ACK from %x\n",
1672                   socket->our_id,
1673                   socket->other_peer);
1674       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1676                   "%x: Read sequence number %u\n",
1677                   socket->our_id,
1678                   (unsigned int) socket->read_sequence_number);
1679       socket->receiver_window_available = 
1680         ntohl (ack_message->receiver_window_size);
1681       /* Attain ESTABLISHED state */
1682       set_state_established (NULL, socket);
1683     }
1684   else
1685     {
1686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1687                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1688       /* FIXME: Send RESET? */
1689       
1690     }
1691   return GNUNET_OK;
1692 }
1693
1694
1695 /**
1696  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1697  *
1698  * @param cls the closure
1699  * @param tunnel connection to the other end
1700  * @param tunnel_ctx the socket
1701  * @param sender who sent the message
1702  * @param message the actual message
1703  * @param atsi performance data for the connection
1704  * @return GNUNET_OK to keep the connection open,
1705  *         GNUNET_SYSERR to close it (signal serious error)
1706  */
1707 static int
1708 server_handle_reset (void *cls,
1709                      struct GNUNET_MESH_Tunnel *tunnel,
1710                      void **tunnel_ctx,
1711                      const struct GNUNET_PeerIdentity *sender,
1712                      const struct GNUNET_MessageHeader *message,
1713                      const struct GNUNET_ATS_Information*atsi)
1714 {
1715   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1716
1717   return GNUNET_OK;
1718 }
1719
1720
1721 /**
1722  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1723  *
1724  * @param cls the closure
1725  * @param tunnel connection to the other end
1726  * @param tunnel_ctx the socket
1727  * @param sender who sent the message
1728  * @param message the actual message
1729  * @param atsi performance data for the connection
1730  * @return GNUNET_OK to keep the connection open,
1731  *         GNUNET_SYSERR to close it (signal serious error)
1732  */
1733 static int
1734 server_handle_transmit_close (void *cls,
1735                               struct GNUNET_MESH_Tunnel *tunnel,
1736                               void **tunnel_ctx,
1737                               const struct GNUNET_PeerIdentity *sender,
1738                               const struct GNUNET_MessageHeader *message,
1739                               const struct GNUNET_ATS_Information*atsi)
1740 {
1741   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1742
1743   return handle_transmit_close (socket,
1744                                 tunnel,
1745                                 sender,
1746                                 (struct GNUNET_STREAM_MessageHeader *)message,
1747                                 atsi);
1748 }
1749
1750
1751 /**
1752  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1753  *
1754  * @param cls the closure
1755  * @param tunnel connection to the other end
1756  * @param tunnel_ctx the socket
1757  * @param sender who sent the message
1758  * @param message the actual message
1759  * @param atsi performance data for the connection
1760  * @return GNUNET_OK to keep the connection open,
1761  *         GNUNET_SYSERR to close it (signal serious error)
1762  */
1763 static int
1764 server_handle_transmit_close_ack (void *cls,
1765                                   struct GNUNET_MESH_Tunnel *tunnel,
1766                                   void **tunnel_ctx,
1767                                   const struct GNUNET_PeerIdentity *sender,
1768                                   const struct GNUNET_MessageHeader *message,
1769                                   const struct GNUNET_ATS_Information*atsi)
1770 {
1771   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1772
1773   return GNUNET_OK;
1774 }
1775
1776
1777 /**
1778  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1779  *
1780  * @param cls the closure
1781  * @param tunnel connection to the other end
1782  * @param tunnel_ctx the socket
1783  * @param sender who sent the message
1784  * @param message the actual message
1785  * @param atsi performance data for the connection
1786  * @return GNUNET_OK to keep the connection open,
1787  *         GNUNET_SYSERR to close it (signal serious error)
1788  */
1789 static int
1790 server_handle_receive_close (void *cls,
1791                              struct GNUNET_MESH_Tunnel *tunnel,
1792                              void **tunnel_ctx,
1793                              const struct GNUNET_PeerIdentity *sender,
1794                              const struct GNUNET_MessageHeader *message,
1795                              const struct GNUNET_ATS_Information*atsi)
1796 {
1797   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1798
1799   return GNUNET_OK;
1800 }
1801
1802
1803 /**
1804  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1805  *
1806  * @param cls the closure
1807  * @param tunnel connection to the other end
1808  * @param tunnel_ctx the socket
1809  * @param sender who sent the message
1810  * @param message the actual message
1811  * @param atsi performance data for the connection
1812  * @return GNUNET_OK to keep the connection open,
1813  *         GNUNET_SYSERR to close it (signal serious error)
1814  */
1815 static int
1816 server_handle_receive_close_ack (void *cls,
1817                                  struct GNUNET_MESH_Tunnel *tunnel,
1818                                  void **tunnel_ctx,
1819                                  const struct GNUNET_PeerIdentity *sender,
1820                                  const struct GNUNET_MessageHeader *message,
1821                                  const struct GNUNET_ATS_Information*atsi)
1822 {
1823   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1824
1825   return GNUNET_OK;
1826 }
1827
1828
1829 /**
1830  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1831  *
1832  * @param cls the closure
1833  * @param tunnel connection to the other end
1834  * @param tunnel_ctx the socket
1835  * @param sender who sent the message
1836  * @param message the actual message
1837  * @param atsi performance data for the connection
1838  * @return GNUNET_OK to keep the connection open,
1839  *         GNUNET_SYSERR to close it (signal serious error)
1840  */
1841 static int
1842 server_handle_close (void *cls,
1843                      struct GNUNET_MESH_Tunnel *tunnel,
1844                      void **tunnel_ctx,
1845                      const struct GNUNET_PeerIdentity *sender,
1846                      const struct GNUNET_MessageHeader *message,
1847                      const struct GNUNET_ATS_Information*atsi)
1848 {
1849   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1850
1851   return GNUNET_OK;
1852 }
1853
1854
1855 /**
1856  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1857  *
1858  * @param cls the closure
1859  * @param tunnel connection to the other end
1860  * @param tunnel_ctx the socket
1861  * @param sender who sent the message
1862  * @param message the actual message
1863  * @param atsi performance data for the connection
1864  * @return GNUNET_OK to keep the connection open,
1865  *         GNUNET_SYSERR to close it (signal serious error)
1866  */
1867 static int
1868 server_handle_close_ack (void *cls,
1869                          struct GNUNET_MESH_Tunnel *tunnel,
1870                          void **tunnel_ctx,
1871                          const struct GNUNET_PeerIdentity *sender,
1872                          const struct GNUNET_MessageHeader *message,
1873                          const struct GNUNET_ATS_Information*atsi)
1874 {
1875   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1876
1877   return GNUNET_OK;
1878 }
1879
1880
1881 /**
1882  * Message Handler for mesh
1883  *
1884  * @param socket the socket through which the ack was received
1885  * @param tunnel connection to the other end
1886  * @param sender who sent the message
1887  * @param ack the acknowledgment message
1888  * @param atsi performance data for the connection
1889  * @return GNUNET_OK to keep the connection open,
1890  *         GNUNET_SYSERR to close it (signal serious error)
1891  */
1892 static int
1893 handle_ack (struct GNUNET_STREAM_Socket *socket,
1894             struct GNUNET_MESH_Tunnel *tunnel,
1895             const struct GNUNET_PeerIdentity *sender,
1896             const struct GNUNET_STREAM_AckMessage *ack,
1897             const struct GNUNET_ATS_Information*atsi)
1898 {
1899   unsigned int packet;
1900   int need_retransmission;
1901   
1902
1903   if (GNUNET_PEER_search (sender) != socket->other_peer)
1904     {
1905       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906                   "%x: Received ACK from non-confirming peer\n",
1907                   socket->our_id);
1908       return GNUNET_YES;
1909     }
1910
1911   switch (socket->state)
1912     {
1913     case (STATE_ESTABLISHED):
1914       if (NULL == socket->write_handle)
1915         {
1916           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917                       "%x: Received DATA_ACK when write_handle is NULL\n",
1918                       socket->our_id);
1919           return GNUNET_OK;
1920         }
1921       /* FIXME: increment in the base sequence number is breaking current flow
1922        */
1923       if (!((socket->write_sequence_number 
1924              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1925         {
1926           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1927                       "%x: Received DATA_ACK with unexpected base sequence "
1928                       "number\n",
1929                       socket->our_id);
1930           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1931                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
1932                       socket->our_id,
1933                       socket->write_sequence_number,
1934                       ntohl (ack->base_sequence_number));
1935           return GNUNET_OK;
1936         }
1937       /* FIXME: include the case when write_handle is cancelled - ignore the 
1938          acks */
1939
1940       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1941                   "%x: Received DATA_ACK from %x\n",
1942                   socket->our_id,
1943                   socket->other_peer);
1944       
1945       /* Cancel the retransmission task */
1946       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1947         {
1948           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1949           socket->retransmission_timeout_task_id = 
1950             GNUNET_SCHEDULER_NO_TASK;
1951         }
1952
1953       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1954         {
1955           if (NULL == socket->write_handle->messages[packet]) break;
1956           if (ntohl (ack->base_sequence_number)
1957               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
1958             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1959                                   packet,
1960                                   GNUNET_YES);
1961           else
1962             if (GNUNET_YES == 
1963                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
1964                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
1965                                       - ntohl (ack->base_sequence_number)))
1966               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1967                                     packet,
1968                                     GNUNET_YES);
1969         }
1970
1971       /* Update the receive window remaining
1972        FIXME : Should update with the value from a data ack with greater
1973        sequence number */
1974       socket->receiver_window_available = 
1975         ntohl (ack->receive_window_remaining);
1976
1977       /* Check if we have received all acknowledgements */
1978       need_retransmission = GNUNET_NO;
1979       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1980         {
1981           if (NULL == socket->write_handle->messages[packet]) break;
1982           if (GNUNET_YES != ackbitmap_is_bit_set 
1983               (&socket->write_handle->ack_bitmap,packet))
1984             {
1985               need_retransmission = GNUNET_YES;
1986               break;
1987             }
1988         }
1989       if (GNUNET_YES == need_retransmission)
1990         {
1991           write_data (socket);
1992         }
1993       else      /* We have to call the write continuation callback now */
1994         {
1995           /* Free the packets */
1996           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1997             {
1998               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1999             }
2000           if (NULL != socket->write_handle->write_cont)
2001             socket->write_handle->write_cont
2002               (socket->write_handle->write_cont_cls,
2003                socket->status,
2004                socket->write_handle->size);
2005           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2006                       "%x: Write completion callback completed\n",
2007                       socket->our_id);
2008           /* We are done with the write handle - Freeing it */
2009           GNUNET_free (socket->write_handle);
2010           socket->write_handle = NULL;
2011         }
2012       break;
2013     default:
2014       break;
2015     }
2016   return GNUNET_OK;
2017 }
2018
2019
2020 /**
2021  * Message Handler for mesh
2022  *
2023  * @param cls the 'struct GNUNET_STREAM_Socket'
2024  * @param tunnel connection to the other end
2025  * @param tunnel_ctx unused
2026  * @param sender who sent the message
2027  * @param message the actual message
2028  * @param atsi performance data for the connection
2029  * @return GNUNET_OK to keep the connection open,
2030  *         GNUNET_SYSERR to close it (signal serious error)
2031  */
2032 static int
2033 client_handle_ack (void *cls,
2034                    struct GNUNET_MESH_Tunnel *tunnel,
2035                    void **tunnel_ctx,
2036                    const struct GNUNET_PeerIdentity *sender,
2037                    const struct GNUNET_MessageHeader *message,
2038                    const struct GNUNET_ATS_Information*atsi)
2039 {
2040   struct GNUNET_STREAM_Socket *socket = cls;
2041   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2042  
2043   return handle_ack (socket, tunnel, sender, ack, atsi);
2044 }
2045
2046
2047 /**
2048  * Message Handler for mesh
2049  *
2050  * @param cls the server's listen socket
2051  * @param tunnel connection to the other end
2052  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2053  * @param sender who sent the message
2054  * @param message the actual message
2055  * @param atsi performance data for the connection
2056  * @return GNUNET_OK to keep the connection open,
2057  *         GNUNET_SYSERR to close it (signal serious error)
2058  */
2059 static int
2060 server_handle_ack (void *cls,
2061                    struct GNUNET_MESH_Tunnel *tunnel,
2062                    void **tunnel_ctx,
2063                    const struct GNUNET_PeerIdentity *sender,
2064                    const struct GNUNET_MessageHeader *message,
2065                    const struct GNUNET_ATS_Information*atsi)
2066 {
2067   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2068   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2069  
2070   return handle_ack (socket, tunnel, sender, ack, atsi);
2071 }
2072
2073
2074 /**
2075  * For client message handlers, the stream socket is in the
2076  * closure argument.
2077  */
2078 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2079   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2080   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2081    sizeof (struct GNUNET_STREAM_AckMessage) },
2082   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2083    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2084   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2085    sizeof (struct GNUNET_STREAM_MessageHeader)},
2086   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2087    sizeof (struct GNUNET_STREAM_MessageHeader)},
2088   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2089    sizeof (struct GNUNET_STREAM_MessageHeader)},
2090   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2091    sizeof (struct GNUNET_STREAM_MessageHeader)},
2092   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2093    sizeof (struct GNUNET_STREAM_MessageHeader)},
2094   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2095    sizeof (struct GNUNET_STREAM_MessageHeader)},
2096   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2097    sizeof (struct GNUNET_STREAM_MessageHeader)},
2098   {NULL, 0, 0}
2099 };
2100
2101
2102 /**
2103  * For server message handlers, the stream socket is in the
2104  * tunnel context, and the listen socket in the closure argument.
2105  */
2106 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2107   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2108   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2109    sizeof (struct GNUNET_STREAM_AckMessage) },
2110   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2111    sizeof (struct GNUNET_STREAM_MessageHeader)},
2112   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2113    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2114   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2115    sizeof (struct GNUNET_STREAM_MessageHeader)},
2116   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2117    sizeof (struct GNUNET_STREAM_MessageHeader)},
2118   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2119    sizeof (struct GNUNET_STREAM_MessageHeader)},
2120   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2121    sizeof (struct GNUNET_STREAM_MessageHeader)},
2122   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2123    sizeof (struct GNUNET_STREAM_MessageHeader)},
2124   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2125    sizeof (struct GNUNET_STREAM_MessageHeader)},
2126   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2127    sizeof (struct GNUNET_STREAM_MessageHeader)},
2128   {NULL, 0, 0}
2129 };
2130
2131
2132 /**
2133  * Function called when our target peer is connected to our tunnel
2134  *
2135  * @param cls the socket for which this tunnel is created
2136  * @param peer the peer identity of the target
2137  * @param atsi performance data for the connection
2138  */
2139 static void
2140 mesh_peer_connect_callback (void *cls,
2141                             const struct GNUNET_PeerIdentity *peer,
2142                             const struct GNUNET_ATS_Information * atsi)
2143 {
2144   struct GNUNET_STREAM_Socket *socket = cls;
2145   struct GNUNET_STREAM_MessageHeader *message;
2146   GNUNET_PEER_Id connected_peer;
2147
2148   connected_peer = GNUNET_PEER_search (peer);
2149   
2150   if (connected_peer != socket->other_peer)
2151     {
2152       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2153                   "%x: A peer which is not our target has connected",
2154                   "to our tunnel\n",
2155                   socket->our_id);
2156       return;
2157     }
2158   
2159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2160               "%x: Target peer %x connected\n", 
2161               socket->our_id,
2162               connected_peer);
2163   
2164   /* Set state to INIT */
2165   socket->state = STATE_INIT;
2166
2167   /* Send HELLO message */
2168   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2169   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2170   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2171   queue_message (socket,
2172                  message,
2173                  &set_state_hello_wait,
2174                  NULL);
2175
2176   /* Call open callback */
2177   if (NULL == socket->open_cb)
2178     {
2179       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2180                   "STREAM_open callback is NULL\n");
2181     }
2182 }
2183
2184
2185 /**
2186  * Function called when our target peer is disconnected from our tunnel
2187  *
2188  * @param cls the socket associated which this tunnel
2189  * @param peer the peer identity of the target
2190  */
2191 static void
2192 mesh_peer_disconnect_callback (void *cls,
2193                                const struct GNUNET_PeerIdentity *peer)
2194 {
2195
2196 }
2197
2198
2199 /**
2200  * Method called whenever a peer creates a tunnel to us
2201  *
2202  * @param cls closure
2203  * @param tunnel new handle to the tunnel
2204  * @param initiator peer that started the tunnel
2205  * @param atsi performance information for the tunnel
2206  * @return initial tunnel context for the tunnel
2207  *         (can be NULL -- that's not an error)
2208  */
2209 static void *
2210 new_tunnel_notify (void *cls,
2211                    struct GNUNET_MESH_Tunnel *tunnel,
2212                    const struct GNUNET_PeerIdentity *initiator,
2213                    const struct GNUNET_ATS_Information *atsi)
2214 {
2215   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2216   struct GNUNET_STREAM_Socket *socket;
2217
2218   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2219      from the same peer again until the socket is closed */
2220
2221   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2222   socket->other_peer = GNUNET_PEER_intern (initiator);
2223   socket->tunnel = tunnel;
2224   socket->session_id = 0;       /* FIXME */
2225   socket->state = STATE_INIT;
2226   socket->lsocket = lsocket;
2227   socket->our_id = lsocket->our_id;
2228   
2229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2230               "%x: Peer %x initiated tunnel to us\n", 
2231               socket->our_id,
2232               socket->other_peer);
2233   
2234   /* FIXME: Copy MESH handle from lsocket to socket */
2235   
2236   return socket;
2237 }
2238
2239
2240 /**
2241  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2242  * any associated state.  This function is NOT called if the client has
2243  * explicitly asked for the tunnel to be destroyed using
2244  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2245  * the tunnel.
2246  *
2247  * @param cls closure (set from GNUNET_MESH_connect)
2248  * @param tunnel connection to the other end (henceforth invalid)
2249  * @param tunnel_ctx place where local state associated
2250  *                   with the tunnel is stored
2251  */
2252 static void 
2253 tunnel_cleaner (void *cls,
2254                 const struct GNUNET_MESH_Tunnel *tunnel,
2255                 void *tunnel_ctx)
2256 {
2257   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2258   
2259   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2260               "%x: Peer %x has terminated connection abruptly\n",
2261               socket->our_id,
2262               socket->other_peer);
2263
2264   socket->status = GNUNET_STREAM_SHUTDOWN;
2265
2266   /* Clear Transmit handles */
2267   if (NULL != socket->transmit_handle)
2268     {
2269       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2270       socket->transmit_handle = NULL;
2271     }
2272   socket->tunnel = NULL;
2273 }
2274
2275
2276 /*****************/
2277 /* API functions */
2278 /*****************/
2279
2280
2281 /**
2282  * Tries to open a stream to the target peer
2283  *
2284  * @param cfg configuration to use
2285  * @param target the target peer to which the stream has to be opened
2286  * @param app_port the application port number which uniquely identifies this
2287  *            stream
2288  * @param open_cb this function will be called after stream has be established 
2289  * @param open_cb_cls the closure for open_cb
2290  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2291  * @return if successful it returns the stream socket; NULL if stream cannot be
2292  *         opened 
2293  */
2294 struct GNUNET_STREAM_Socket *
2295 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2296                     const struct GNUNET_PeerIdentity *target,
2297                     GNUNET_MESH_ApplicationType app_port,
2298                     GNUNET_STREAM_OpenCallback open_cb,
2299                     void *open_cb_cls,
2300                     ...)
2301 {
2302   struct GNUNET_STREAM_Socket *socket;
2303   struct GNUNET_PeerIdentity own_peer_id;
2304   enum GNUNET_STREAM_Option option;
2305   va_list vargs;                /* Variable arguments */
2306
2307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2308               "%s\n", __func__);
2309
2310   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2311   socket->other_peer = GNUNET_PEER_intern (target);
2312   socket->open_cb = open_cb;
2313   socket->open_cls = open_cb_cls;
2314   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2315   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2316   
2317   /* Set defaults */
2318   socket->retransmit_timeout = 
2319     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2320
2321   va_start (vargs, open_cb_cls); /* Parse variable args */
2322   do {
2323     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2324     switch (option)
2325       {
2326       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2327         /* Expect struct GNUNET_TIME_Relative */
2328         socket->retransmit_timeout = va_arg (vargs,
2329                                              struct GNUNET_TIME_Relative);
2330         break;
2331       case GNUNET_STREAM_OPTION_END:
2332         break;
2333       }
2334   } while (GNUNET_STREAM_OPTION_END != option);
2335   va_end (vargs);               /* End of variable args parsing */
2336   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2337                                       10,  /* QUEUE size as parameter? */
2338                                       socket, /* cls */
2339                                       NULL, /* No inbound tunnel handler */
2340                                       &tunnel_cleaner, /* FIXME: not required? */
2341                                       client_message_handlers,
2342                                       &app_port); /* We don't get inbound tunnels */
2343   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2344     {
2345       GNUNET_free (socket);
2346       return NULL;
2347     }
2348
2349   /* Now create the mesh tunnel to target */
2350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2351               "Creating MESH Tunnel\n");
2352   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2353                                               NULL, /* Tunnel context */
2354                                               &mesh_peer_connect_callback,
2355                                               &mesh_peer_disconnect_callback,
2356                                               socket);
2357   GNUNET_assert (NULL != socket->tunnel);
2358   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2359                                         target);
2360   
2361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362               "%s() END\n", __func__);
2363   return socket;
2364 }
2365
2366
2367 /**
2368  * Shutdown the stream for reading or writing (man 2 shutdown).
2369  *
2370  * @param socket the stream socket
2371  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2372  */
2373 void
2374 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2375                         int how)
2376 {
2377   return;
2378 }
2379
2380
2381 /**
2382  * Closes the stream
2383  *
2384  * @param socket the stream socket
2385  */
2386 void
2387 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2388 {
2389   struct MessageQueue *head;
2390
2391   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2392     {
2393       /* socket closed with read task pending!? */
2394       GNUNET_break (0);
2395       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2396       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2397     }
2398   
2399   /* Terminate the ack'ing tasks if they are still present */
2400   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2401     {
2402       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2403       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2404     }
2405
2406   /* Clear Transmit handles */
2407   if (NULL != socket->transmit_handle)
2408     {
2409       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2410       socket->transmit_handle = NULL;
2411     }
2412
2413   /* Clear existing message queue */
2414   while (NULL != (head = socket->queue_head)) {
2415     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2416                                  socket->queue_tail,
2417                                  head);
2418     GNUNET_free (head->message);
2419     GNUNET_free (head);
2420   }
2421
2422   /* Close associated tunnel */
2423   if (NULL != socket->tunnel)
2424     {
2425       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2426       socket->tunnel = NULL;
2427     }
2428
2429   /* Close mesh connection */
2430   if (NULL != socket->mesh && NULL == socket->lsocket)
2431     {
2432       GNUNET_MESH_disconnect (socket->mesh);
2433       socket->mesh = NULL;
2434     }
2435   
2436   /* Release receive buffer */
2437   if (NULL != socket->receive_buffer)
2438     {
2439       GNUNET_free (socket->receive_buffer);
2440     }
2441
2442   GNUNET_free (socket);
2443 }
2444
2445
2446 /**
2447  * Listens for stream connections for a specific application ports
2448  *
2449  * @param cfg the configuration to use
2450  * @param app_port the application port for which new streams will be accepted
2451  * @param listen_cb this function will be called when a peer tries to establish
2452  *            a stream with us
2453  * @param listen_cb_cls closure for listen_cb
2454  * @return listen socket, NULL for any error
2455  */
2456 struct GNUNET_STREAM_ListenSocket *
2457 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2458                       GNUNET_MESH_ApplicationType app_port,
2459                       GNUNET_STREAM_ListenCallback listen_cb,
2460                       void *listen_cb_cls)
2461 {
2462   /* FIXME: Add variable args for passing configration options? */
2463   struct GNUNET_STREAM_ListenSocket *lsocket;
2464   struct GNUNET_PeerIdentity our_peer_id;
2465
2466   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2467   lsocket->port = app_port;
2468   lsocket->listen_cb = listen_cb;
2469   lsocket->listen_cb_cls = listen_cb_cls;
2470   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2471   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2472   lsocket->mesh = GNUNET_MESH_connect (cfg,
2473                                        10, /* FIXME: QUEUE size as parameter? */
2474                                        lsocket, /* Closure */
2475                                        &new_tunnel_notify,
2476                                        &tunnel_cleaner,
2477                                        server_message_handlers,
2478                                        &app_port);
2479   GNUNET_assert (NULL != lsocket->mesh);
2480   return lsocket;
2481 }
2482
2483
2484 /**
2485  * Closes the listen socket
2486  *
2487  * @param lsocket the listen socket
2488  */
2489 void
2490 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2491 {
2492   /* Close MESH connection */
2493   GNUNET_assert (NULL != lsocket->mesh);
2494   GNUNET_MESH_disconnect (lsocket->mesh);
2495   
2496   GNUNET_free (lsocket);
2497 }
2498
2499
2500 /**
2501  * Tries to write the given data to the stream
2502  *
2503  * @param socket the socket representing a stream
2504  * @param data the data buffer from where the data is written into the stream
2505  * @param size the number of bytes to be written from the data buffer
2506  * @param timeout the timeout period
2507  * @param write_cont the function to call upon writing some bytes into the stream
2508  * @param write_cont_cls the closure
2509  * @return handle to cancel the operation
2510  */
2511 struct GNUNET_STREAM_IOWriteHandle *
2512 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2513                      const void *data,
2514                      size_t size,
2515                      struct GNUNET_TIME_Relative timeout,
2516                      GNUNET_STREAM_CompletionContinuation write_cont,
2517                      void *write_cont_cls)
2518 {
2519   unsigned int num_needed_packets;
2520   unsigned int packet;
2521   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2522   uint32_t packet_size;
2523   uint32_t payload_size;
2524   struct GNUNET_STREAM_DataMessage *data_msg;
2525   const void *sweep;
2526   struct GNUNET_TIME_Relative ack_deadline;
2527
2528   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2529               "%s\n", __func__);
2530
2531   /* Return NULL if there is already a write request pending */
2532   if (NULL != socket->write_handle)
2533   {
2534     GNUNET_break (0);
2535     return NULL;
2536   }
2537   if (!((STATE_ESTABLISHED == socket->state)
2538         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2539         || (STATE_RECEIVE_CLOSED == socket->state)))
2540     {
2541       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2542                   "%x: Attempting to write on a closed (OR) not-yet-established"
2543                   "stream\n",
2544                   socket->our_id);
2545       return NULL;
2546     } 
2547   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2548     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2549   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2550   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2551   io_handle->write_cont = write_cont;
2552   io_handle->write_cont_cls = write_cont_cls;
2553   io_handle->size = size;
2554   sweep = data;
2555   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2556      determined from RTT */
2557   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2558   /* Divide the given buffer into packets for sending */
2559   for (packet=0; packet < num_needed_packets; packet++)
2560     {
2561       if ((packet + 1) * max_payload_size < size) 
2562         {
2563           payload_size = max_payload_size;
2564           packet_size = MAX_PACKET_SIZE;
2565         }
2566       else 
2567         {
2568           payload_size = size - packet * max_payload_size;
2569           packet_size =  payload_size + sizeof (struct
2570                                                 GNUNET_STREAM_DataMessage); 
2571         }
2572       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2573       io_handle->messages[packet]->header.header.size = htons (packet_size);
2574       io_handle->messages[packet]->header.header.type =
2575         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2576       io_handle->messages[packet]->sequence_number =
2577         htonl (socket->write_sequence_number++);
2578       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2579
2580       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2581          determined from RTT */
2582       io_handle->messages[packet]->ack_deadline =
2583         GNUNET_TIME_relative_hton (ack_deadline);
2584       data_msg = io_handle->messages[packet];
2585       /* Copy data from given buffer to the packet */
2586       memcpy (&data_msg[1],
2587               sweep,
2588               payload_size);
2589       sweep += payload_size;
2590       socket->write_offset += payload_size;
2591     }
2592   socket->write_handle = io_handle;
2593   write_data (socket);
2594
2595   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2596               "%s() END\n", __func__);
2597
2598   return io_handle;
2599 }
2600
2601
2602 /**
2603  * Tries to read data from the stream
2604  *
2605  * @param socket the socket representing a stream
2606  * @param timeout the timeout period
2607  * @param proc function to call with data (once only)
2608  * @param proc_cls the closure for proc
2609  * @return handle to cancel the operation
2610  */
2611 struct GNUNET_STREAM_IOReadHandle *
2612 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2613                     struct GNUNET_TIME_Relative timeout,
2614                     GNUNET_STREAM_DataProcessor proc,
2615                     void *proc_cls)
2616 {
2617   struct GNUNET_STREAM_IOReadHandle *read_handle;
2618   
2619   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2620               "%x: %s()\n", 
2621               socket->our_id,
2622               __func__);
2623
2624   /* Return NULL if there is already a read handle; the user has to cancel that
2625   first before continuing or has to wait until it is completed */
2626   if (NULL != socket->read_handle) return NULL;
2627
2628   GNUNET_assert (NULL != proc);
2629
2630   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2631   read_handle->proc = proc;
2632   read_handle->proc_cls = proc_cls;
2633   socket->read_handle = read_handle;
2634
2635   /* Check if we have a packet at bitmap 0 */
2636   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2637                                           0))
2638     {
2639       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2640                                                        socket);
2641    
2642     }
2643   
2644   /* Setup the read timeout task */
2645   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2646                                                                &read_io_timeout,
2647                                                                socket);
2648   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2649               "%x: %s() END\n",
2650               socket->our_id,
2651               __func__);
2652   return read_handle;
2653 }
2654
2655
2656 /**
2657  * Cancel pending write operation.
2658  *
2659  * @param ioh handle to operation to cancel
2660  */
2661 void
2662 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2663 {
2664   /* FIXME: Should cancel the write retransmission task */
2665   return;
2666 }
2667
2668
2669 /**
2670  * Cancel pending read operation.
2671  *
2672  * @param ioh handle to operation to cancel
2673  */
2674 void
2675 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2676 {
2677   return;
2678 }