f9895ae771fcb5d5cc726c6c8b57a9e5d21d2428
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * The listen socket from which this socket is derived. Should be NULL if it
235    * is not a derived socket
236    */
237   struct GNUNET_STREAM_ListenSocket *lsocket;
238
239   /**
240    * Task identifier for the read io timeout task
241    */
242   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
243
244   /**
245    * Task identifier for retransmission task after timeout
246    */
247   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
248
249   /**
250    * The task for sending timely Acks
251    */
252   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
253
254   /**
255    * Task scheduled to continue a read operation.
256    */
257   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
258
259   /**
260    * The state of the protocol associated with this socket
261    */
262   enum State state;
263
264   /**
265    * The status of the socket
266    */
267   enum GNUNET_STREAM_Status status;
268
269   /**
270    * The number of previous timeouts; FIXME: currently not used
271    */
272   unsigned int retries;
273
274   /**
275    * The peer identity of the peer at the other end of the stream
276    */
277   GNUNET_PEER_Id other_peer;
278
279   /**
280    * Our Peer Identity (for debugging)
281    */
282   GNUNET_PEER_Id our_id;
283
284   /**
285    * The application port number (type: uint32_t)
286    */
287   GNUNET_MESH_ApplicationType app_port;
288
289   /**
290    * The session id associated with this stream connection
291    * FIXME: Not used currently, may be removed
292    */
293   uint32_t session_id;
294
295   /**
296    * Write sequence number. Set to random when sending HELLO(client) and
297    * HELLO_ACK(server) 
298    */
299   uint32_t write_sequence_number;
300
301   /**
302    * Read sequence number. This number's value is determined during handshake
303    */
304   uint32_t read_sequence_number;
305
306   /**
307    * The receiver buffer size
308    */
309   uint32_t receive_buffer_size;
310
311   /**
312    * The receiver buffer boundaries
313    */
314   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
315
316   /**
317    * receiver's available buffer after the last acknowledged packet
318    */
319   uint32_t receiver_window_available;
320
321   /**
322    * The offset pointer used during write operation
323    */
324   uint32_t write_offset;
325
326   /**
327    * The offset after which we are expecting data
328    */
329   uint32_t read_offset;
330
331   /**
332    * The offset upto which user has read from the received buffer
333    */
334   uint32_t copy_offset;
335 };
336
337
338 /**
339  * A socket for listening
340  */
341 struct GNUNET_STREAM_ListenSocket
342 {
343   /**
344    * The mesh handle
345    */
346   struct GNUNET_MESH_Handle *mesh;
347
348   /**
349    * The callback function which is called after successful opening socket
350    */
351   GNUNET_STREAM_ListenCallback listen_cb;
352
353   /**
354    * The call back closure
355    */
356   void *listen_cb_cls;
357
358   /**
359    * Our interned Peer's identity
360    */
361   GNUNET_PEER_Id our_id;
362
363   /**
364    * The service port
365    * FIXME: Remove if not required!
366    */
367   GNUNET_MESH_ApplicationType port;
368 };
369
370
371 /**
372  * The IO Write Handle
373  */
374 struct GNUNET_STREAM_IOWriteHandle
375 {
376   /**
377    * The packet_buffers associated with this Handle
378    */
379   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
380
381   /**
382    * The write continuation callback
383    */
384   GNUNET_STREAM_CompletionContinuation write_cont;
385
386   /**
387    * Write continuation closure
388    */
389   void *write_cont_cls;
390
391   /**
392    * The bitmap of this IOHandle; Corresponding bit for a message is set when
393    * it has been acknowledged by the receiver
394    */
395   GNUNET_STREAM_AckBitmap ack_bitmap;
396
397   /**
398    * Number of bytes in this write handle
399    */
400   size_t size;
401
402   /**
403    * Number of packets sent before waiting for an ack
404    *
405    * FIXME: Do we need this?
406    */
407   unsigned int sent_packets;
408 };
409
410
411 /**
412  * The IO Read Handle
413  */
414 struct GNUNET_STREAM_IOReadHandle
415 {
416   /**
417    * Callback for the read processor
418    */
419   GNUNET_STREAM_DataProcessor proc;
420
421   /**
422    * The closure pointer for the read processor callback
423    */
424   void *proc_cls;
425 };
426
427
428 /**
429  * Default value in seconds for various timeouts
430  */
431 static unsigned int default_timeout = 10;
432
433 /**
434  * Callback function for sending queued message
435  *
436  * @param cls closure the socket
437  * @param size number of bytes available in buf
438  * @param buf where the callee should write the message
439  * @return number of bytes written to buf
440  */
441 static size_t
442 send_message_notify (void *cls, size_t size, void *buf)
443 {
444   struct GNUNET_STREAM_Socket *socket = cls;
445   struct GNUNET_PeerIdentity target;
446   struct MessageQueue *head;
447   size_t ret;
448
449   socket->transmit_handle = NULL; /* Remove the transmit handle */
450   head = socket->queue_head;
451   if (NULL == head)
452     return 0; /* just to be safe */
453   GNUNET_PEER_resolve (socket->other_peer, &target);
454   if (0 == size)                /* request timed out */
455     {
456       socket->retries++;
457       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458                   "Message sending timed out. Retry %d \n",
459                   socket->retries);
460       socket->transmit_handle = 
461         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
462                                            0, /* Corking */
463                                            1, /* Priority */
464                                            /* FIXME: exponential backoff */
465                                            socket->retransmit_timeout,
466                                            &target,
467                                            ntohs (head->message->header.size),
468                                            &send_message_notify,
469                                            socket);
470       return 0;
471     }
472
473   ret = ntohs (head->message->header.size);
474   GNUNET_assert (size >= ret);
475   memcpy (buf, head->message, ret);
476   if (NULL != head->finish_cb)
477     {
478       head->finish_cb (head->finish_cb_cls, socket);
479     }
480   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
481                                socket->queue_tail,
482                                head);
483   GNUNET_free (head->message);
484   GNUNET_free (head);
485   head = socket->queue_head;
486   if (NULL != head)    /* more pending messages to send */
487     {
488       socket->retries = 0;
489       socket->transmit_handle = 
490         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
491                                            0, /* Corking */
492                                            1, /* Priority */
493                                            /* FIXME: exponential backoff */
494                                            socket->retransmit_timeout,
495                                            &target,
496                                            ntohs (head->message->header.size),
497                                            &send_message_notify,
498                                            socket);
499     }
500   return ret;
501 }
502
503
504 /**
505  * Queues a message for sending using the mesh connection of a socket
506  *
507  * @param socket the socket whose mesh connection is used
508  * @param message the message to be sent
509  * @param finish_cb the callback to be called when the message is sent
510  * @param finish_cb_cls the closure for the callback
511  */
512 static void
513 queue_message (struct GNUNET_STREAM_Socket *socket,
514                struct GNUNET_STREAM_MessageHeader *message,
515                SendFinishCallback finish_cb,
516                void *finish_cb_cls)
517 {
518   struct MessageQueue *queue_entity;
519   struct GNUNET_PeerIdentity target;
520
521   GNUNET_assert 
522     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
523      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
524
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526               "%x: Queueing message of type %d and size %d\n",
527               socket->our_id,
528               ntohs (message->header.type),
529               ntohs (message->header.size));
530   GNUNET_assert (NULL != message);
531   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
532   queue_entity->message = message;
533   queue_entity->finish_cb = finish_cb;
534   queue_entity->finish_cb_cls = finish_cb_cls;
535   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
536                                     socket->queue_tail,
537                                     queue_entity);
538   if (NULL == socket->transmit_handle)
539   {
540     socket->retries = 0;
541     GNUNET_PEER_resolve (socket->other_peer, &target);
542     socket->transmit_handle = 
543       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
544                                          0, /* Corking */
545                                          1, /* Priority */
546                                          socket->retransmit_timeout,
547                                          &target,
548                                          ntohs (message->header.size),
549                                          &send_message_notify,
550                                          socket);
551   }
552 }
553
554
555 /**
556  * Copies a message and queues it for sending using the mesh connection of
557  * given socket 
558  *
559  * @param socket the socket whose mesh connection is used
560  * @param message the message to be sent
561  * @param finish_cb the callback to be called when the message is sent
562  * @param finish_cb_cls the closure for the callback
563  */
564 static void
565 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
566                         const struct GNUNET_STREAM_MessageHeader *message,
567                         SendFinishCallback finish_cb,
568                         void *finish_cb_cls)
569 {
570   struct GNUNET_STREAM_MessageHeader *msg_copy;
571   uint16_t size;
572   
573   size = ntohs (message->header.size);
574   msg_copy = GNUNET_malloc (size);
575   memcpy (msg_copy, message, size);
576   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
577 }
578
579
580 /**
581  * Callback function for sending ack message
582  *
583  * @param cls closure the ACK message created in ack_task
584  * @param size number of bytes available in buffer
585  * @param buf where the callee should write the message
586  * @return number of bytes written to buf
587  */
588 static size_t
589 send_ack_notify (void *cls, size_t size, void *buf)
590 {
591   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
592
593   if (0 == size)
594     {
595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596                   "%s called with size 0\n", __func__);
597       return 0;
598     }
599   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
600   
601   size = ntohs (ack_msg->header.header.size);
602   memcpy (buf, ack_msg, size);
603   return size;
604 }
605
606 /**
607  * Writes data using the given socket. The amount of data written is limited by
608  * the receiver_window_size
609  *
610  * @param socket the socket to use
611  */
612 static void 
613 write_data (struct GNUNET_STREAM_Socket *socket);
614
615 /**
616  * Task for retransmitting data messages if they aren't ACK before their ack
617  * deadline 
618  *
619  * @param cls the socket
620  * @param tc the Task context
621  */
622 static void
623 retransmission_timeout_task (void *cls,
624                              const struct GNUNET_SCHEDULER_TaskContext *tc)
625 {
626   struct GNUNET_STREAM_Socket *socket = cls;
627   
628   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
629     return;
630
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "%x: Retransmitting DATA...\n", socket->our_id);
633   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
634   write_data (socket);
635 }
636
637
638 /**
639  * Task for sending ACK message
640  *
641  * @param cls the socket
642  * @param tc the Task context
643  */
644 static void
645 ack_task (void *cls,
646           const struct GNUNET_SCHEDULER_TaskContext *tc)
647 {
648   struct GNUNET_STREAM_Socket *socket = cls;
649   struct GNUNET_STREAM_AckMessage *ack_msg;
650   struct GNUNET_PeerIdentity target;
651
652   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
653     {
654       return;
655     }
656
657   socket->ack_task_id = 0;
658
659   /* Create the ACK Message */
660   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
661   ack_msg->header.header.size = htons (sizeof (struct 
662                                                GNUNET_STREAM_AckMessage));
663   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
664   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
665   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
666   ack_msg->receive_window_remaining = 
667     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
668
669   GNUNET_PEER_resolve (socket->other_peer, &target);
670   /* Request MESH for sending ACK */
671   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
672                                      0, /* Corking */
673                                      1, /* Priority */
674                                      socket->retransmit_timeout,
675                                      &target,
676                                      ntohs (ack_msg->header.header.size),
677                                      &send_ack_notify,
678                                      ack_msg);
679
680   
681 }
682
683
684 /**
685  * Function to modify a bit in GNUNET_STREAM_AckBitmap
686  *
687  * @param bitmap the bitmap to modify
688  * @param bit the bit number to modify
689  * @param value GNUNET_YES to on, GNUNET_NO to off
690  */
691 static void
692 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
693                       unsigned int bit, 
694                       int value)
695 {
696   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
697   if (GNUNET_YES == value)
698     *bitmap |= (1LL << bit);
699   else
700     *bitmap &= ~(1LL << bit);
701 }
702
703
704 /**
705  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
706  *
707  * @param bitmap address of the bitmap that has to be checked
708  * @param bit the bit number to check
709  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
710  */
711 static uint8_t
712 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
713                       unsigned int bit)
714 {
715   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
716   return 0 != (*bitmap & (1LL << bit));
717 }
718
719
720
721 /**
722  * Function called when Data Message is sent
723  *
724  * @param cls the io_handle corresponding to the Data Message
725  * @param socket the socket which was used
726  */
727 static void
728 write_data_finish_cb (void *cls,
729                       struct GNUNET_STREAM_Socket *socket)
730 {
731   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
732
733   io_handle->sent_packets++;
734 }
735
736
737 /**
738  * Writes data using the given socket. The amount of data written is limited by
739  * the receiver_window_size
740  *
741  * @param socket the socket to use
742  */
743 static void 
744 write_data (struct GNUNET_STREAM_Socket *socket)
745 {
746   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
747   int packet;                   /* Although an int, should never be negative */
748   int ack_packet;
749
750   ack_packet = -1;
751   /* Find the last acknowledged packet */
752   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
753     {      
754       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
755                                               packet))
756         ack_packet = packet;        
757       else if (NULL == io_handle->messages[packet])
758         break;
759     }
760   /* Resend packets which weren't ack'ed */
761   for (packet=0; packet < ack_packet; packet++)
762     {
763       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
764                                              packet))
765         {
766           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767                       "%x: Placing DATA message with sequence %u in send queue\n",
768                       socket->our_id,
769                       ntohl (io_handle->messages[packet]->sequence_number));
770
771           copy_and_queue_message (socket,
772                                   &io_handle->messages[packet]->header,
773                                   NULL,
774                                   NULL);
775         }
776     }
777   packet = ack_packet + 1;
778   /* Now send new packets if there is enough buffer space */
779   while ( (NULL != io_handle->messages[packet]) &&
780           (socket->receiver_window_available 
781            >= ntohs (io_handle->messages[packet]->header.header.size)) )
782     {
783       socket->receiver_window_available -= 
784         ntohs (io_handle->messages[packet]->header.header.size);
785       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786                   "%x: Placing DATA message with sequence %u in send queue\n",
787                   socket->our_id,
788                   ntohl (io_handle->messages[packet]->sequence_number));
789       copy_and_queue_message (socket,
790                               &io_handle->messages[packet]->header,
791                               &write_data_finish_cb,
792                               io_handle);
793       packet++;
794     }
795
796   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
797     socket->retransmission_timeout_task_id = 
798       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
799                                     (GNUNET_TIME_UNIT_SECONDS, 5),
800                                     &retransmission_timeout_task,
801                                     socket);
802 }
803
804
805 /**
806  * Task for calling the read processor
807  *
808  * @param cls the socket
809  * @param tc the task context
810  */
811 static void
812 call_read_processor (void *cls,
813                           const struct GNUNET_SCHEDULER_TaskContext *tc)
814 {
815   struct GNUNET_STREAM_Socket *socket = cls;
816   size_t read_size;
817   size_t valid_read_size;
818   unsigned int packet;
819   uint32_t sequence_increase;
820   uint32_t offset_increase;
821
822   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
823   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
824     return;
825
826   GNUNET_assert (NULL != socket->read_handle);
827   GNUNET_assert (NULL != socket->read_handle->proc);
828
829   /* Check the bitmap for any holes */
830   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
831     {
832       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
833                                              packet))
834         break;
835     }
836   /* We only call read processor if we have the first packet */
837   GNUNET_assert (0 < packet);
838
839   valid_read_size = 
840     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
841
842   GNUNET_assert (0 != valid_read_size);
843
844   /* Cancel the read_io_timeout_task */
845   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
846   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
847
848   /* Call the data processor */
849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850               "%x: Calling read processor\n",
851               socket->our_id);
852   read_size = 
853     socket->read_handle->proc (socket->read_handle->proc_cls,
854                                socket->status,
855                                socket->receive_buffer + socket->copy_offset,
856                                valid_read_size);
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "%x: Read processor read %d bytes\n",
859               socket->our_id,
860               read_size);
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "%x: Read processor completed successfully\n",
863               socket->our_id);
864
865   /* Free the read handle */
866   GNUNET_free (socket->read_handle);
867   socket->read_handle = NULL;
868
869   GNUNET_assert (read_size <= valid_read_size);
870   socket->copy_offset += read_size;
871
872   /* Determine upto which packet we can remove from the buffer */
873   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
874     {
875       if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
876         { packet++; break; }
877       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
878         break;
879     }
880
881   /* If no packets can be removed we can't move the buffer */
882   if (0 == packet) return;
883
884   sequence_increase = packet;
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "%x: Sequence increase after read processor completion: %u\n",
887               socket->our_id,
888               sequence_increase);
889
890   /* Shift the data in the receive buffer */
891   memmove (socket->receive_buffer,
892            socket->receive_buffer 
893            + socket->receive_buffer_boundaries[sequence_increase-1],
894            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
895   
896   /* Shift the bitmap */
897   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
898   
899   /* Set read_sequence_number */
900   socket->read_sequence_number += sequence_increase;
901   
902   /* Set read_offset */
903   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
904   socket->read_offset += offset_increase;
905
906   /* Fix copy_offset */
907   GNUNET_assert (offset_increase <= socket->copy_offset);
908   socket->copy_offset -= offset_increase;
909   
910   /* Fix relative boundaries */
911   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
912     {
913       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
914         {
915           socket->receive_buffer_boundaries[packet] = 
916             socket->receive_buffer_boundaries[packet + sequence_increase] 
917             - offset_increase;
918         }
919       else
920         socket->receive_buffer_boundaries[packet] = 0;
921     }
922 }
923
924
925 /**
926  * Cancels the existing read io handle
927  *
928  * @param cls the closure from the SCHEDULER call
929  * @param tc the task context
930  */
931 static void
932 read_io_timeout (void *cls, 
933                 const struct GNUNET_SCHEDULER_TaskContext *tc)
934 {
935   struct GNUNET_STREAM_Socket *socket = cls;
936   GNUNET_STREAM_DataProcessor proc;
937   void *proc_cls;
938
939   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
940   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
941   {
942     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943                 "%x: Read task timedout - Cancelling it\n",
944                 socket->our_id);
945     GNUNET_SCHEDULER_cancel (socket->read_task_id);
946     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
947   }
948   GNUNET_assert (NULL != socket->read_handle);
949   proc = socket->read_handle->proc;
950   proc_cls = socket->read_handle->proc_cls;
951
952   GNUNET_free (socket->read_handle);
953   socket->read_handle = NULL;
954   /* Call the read processor to signal timeout */
955   proc (proc_cls,
956         GNUNET_STREAM_TIMEOUT,
957         NULL,
958         0);
959 }
960
961
962 /**
963  * Handler for DATA messages; Same for both client and server
964  *
965  * @param socket the socket through which the ack was received
966  * @param tunnel connection to the other end
967  * @param sender who sent the message
968  * @param msg the data message
969  * @param atsi performance data for the connection
970  * @return GNUNET_OK to keep the connection open,
971  *         GNUNET_SYSERR to close it (signal serious error)
972  */
973 static int
974 handle_data (struct GNUNET_STREAM_Socket *socket,
975              struct GNUNET_MESH_Tunnel *tunnel,
976              const struct GNUNET_PeerIdentity *sender,
977              const struct GNUNET_STREAM_DataMessage *msg,
978              const struct GNUNET_ATS_Information*atsi)
979 {
980   const void *payload;
981   uint32_t bytes_needed;
982   uint32_t relative_offset;
983   uint32_t relative_sequence_number;
984   uint16_t size;
985
986   size = htons (msg->header.header.size);
987   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
988     {
989       GNUNET_break_op (0);
990       return GNUNET_SYSERR;
991     }
992
993   if (GNUNET_PEER_search (sender) != socket->other_peer)
994     {
995       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996                   "%x: Received DATA from non-confirming peer\n",
997                   socket->our_id);
998       return GNUNET_YES;
999     }
1000
1001   switch (socket->state)
1002     {
1003     case STATE_ESTABLISHED:
1004     case STATE_TRANSMIT_CLOSED:
1005     case STATE_TRANSMIT_CLOSE_WAIT:
1006       
1007       /* check if the message's sequence number is in the range we are
1008          expecting */
1009       relative_sequence_number = 
1010         ntohl (msg->sequence_number) - socket->read_sequence_number;
1011       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1012         {
1013           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014                       "%x: Ignoring received message with sequence number %u\n",
1015                       socket->our_id,
1016                       ntohl (msg->sequence_number));
1017           /* Start ACK sending task if one is not already present */
1018           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1019             {
1020               socket->ack_task_id = 
1021                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1022                                               (msg->ack_deadline),
1023                                               &ack_task,
1024                                               socket);
1025             }
1026           return GNUNET_YES;
1027         }
1028       
1029       /* Check if we have already seen this message */
1030       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1031                                               relative_sequence_number))
1032         {
1033           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034                       "%x: Ignoring already received message with sequence "
1035                       "number %u\n",
1036                       socket->our_id,
1037                       ntohl (msg->sequence_number));
1038           /* Start ACK sending task if one is not already present */
1039           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1040             {
1041               socket->ack_task_id = 
1042                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1043                                               (msg->ack_deadline),
1044                                               &ack_task,
1045                                               socket);
1046             }
1047           return GNUNET_YES;
1048         }
1049
1050       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1051                   "%x: Receiving DATA with sequence number: %u and size: %d "
1052                   "from %x\n",
1053                   socket->our_id,
1054                   ntohl (msg->sequence_number),
1055                   ntohs (msg->header.header.size),
1056                   socket->other_peer);
1057
1058       /* Check if we have to allocate the buffer */
1059       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1060       relative_offset = ntohl (msg->offset) - socket->read_offset;
1061       bytes_needed = relative_offset + size;
1062       if (bytes_needed > socket->receive_buffer_size)
1063         {
1064           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1065             {
1066               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1067                                                        bytes_needed);
1068               socket->receive_buffer_size = bytes_needed;
1069             }
1070           else
1071             {
1072               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073                           "%x: Cannot accommodate packet %d as buffer is",
1074                           "full\n",
1075                           socket->our_id,
1076                           ntohl (msg->sequence_number));
1077               return GNUNET_YES;
1078             }
1079         }
1080       
1081       /* Copy Data to buffer */
1082       payload = &msg[1];
1083       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1084       memcpy (socket->receive_buffer + relative_offset,
1085               payload,
1086               size);
1087       socket->receive_buffer_boundaries[relative_sequence_number] = 
1088         relative_offset + size;
1089       
1090       /* Modify the ACK bitmap */
1091       ackbitmap_modify_bit (&socket->ack_bitmap,
1092                             relative_sequence_number,
1093                             GNUNET_YES);
1094
1095       /* Start ACK sending task if one is not already present */
1096       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1097        {
1098          socket->ack_task_id = 
1099            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1100                                          (msg->ack_deadline),
1101                                          &ack_task,
1102                                          socket);
1103        }
1104
1105       if ((NULL != socket->read_handle) /* A read handle is waiting */
1106           /* There is no current read task */
1107           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1108           /* We have the first packet */
1109           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1110                                                  0)))
1111         {
1112           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                       "%x: Scheduling read processor\n",
1114                       socket->our_id);
1115
1116           socket->read_task_id = 
1117             GNUNET_SCHEDULER_add_now (&call_read_processor,
1118                                       socket);
1119         }
1120       
1121       break;
1122
1123     default:
1124       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125                   "%x: Received data message when it cannot be handled\n",
1126                   socket->our_id);
1127       break;
1128     }
1129   return GNUNET_YES;
1130 }
1131
1132 /**
1133  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1134  *
1135  * @param cls the socket (set from GNUNET_MESH_connect)
1136  * @param tunnel connection to the other end
1137  * @param tunnel_ctx place to store local state associated with the tunnel
1138  * @param sender who sent the message
1139  * @param message the actual message
1140  * @param atsi performance data for the connection
1141  * @return GNUNET_OK to keep the connection open,
1142  *         GNUNET_SYSERR to close it (signal serious error)
1143  */
1144 static int
1145 client_handle_data (void *cls,
1146              struct GNUNET_MESH_Tunnel *tunnel,
1147              void **tunnel_ctx,
1148              const struct GNUNET_PeerIdentity *sender,
1149              const struct GNUNET_MessageHeader *message,
1150              const struct GNUNET_ATS_Information*atsi)
1151 {
1152   struct GNUNET_STREAM_Socket *socket = cls;
1153
1154   return handle_data (socket, 
1155                       tunnel, 
1156                       sender, 
1157                       (const struct GNUNET_STREAM_DataMessage *) message, 
1158                       atsi);
1159 }
1160
1161
1162 /**
1163  * Callback to set state to ESTABLISHED
1164  *
1165  * @param cls the closure from queue_message FIXME: document
1166  * @param socket the socket to requiring state change
1167  */
1168 static void
1169 set_state_established (void *cls,
1170                        struct GNUNET_STREAM_Socket *socket)
1171 {
1172   struct GNUNET_PeerIdentity initiator_pid;
1173
1174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1175               "%x: Attaining ESTABLISHED state\n",
1176               socket->our_id);
1177   socket->write_offset = 0;
1178   socket->read_offset = 0;
1179   socket->state = STATE_ESTABLISHED;
1180   /* FIXME: What if listen_cb is NULL */
1181   if (NULL != socket->lsocket)
1182     {
1183       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1184       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185                   "%x: Calling listen callback\n",
1186                   socket->our_id);
1187       if (GNUNET_SYSERR == 
1188           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1189                                       socket,
1190                                       &initiator_pid))
1191         {
1192           socket->state = STATE_CLOSED;
1193           /* FIXME: We should close in a decent way */
1194           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1195           GNUNET_free (socket);
1196         }
1197     }
1198   else if (socket->open_cb)
1199     socket->open_cb (socket->open_cls, socket);
1200 }
1201
1202
1203 /**
1204  * Callback to set state to HELLO_WAIT
1205  *
1206  * @param cls the closure from queue_message
1207  * @param socket the socket to requiring state change
1208  */
1209 static void
1210 set_state_hello_wait (void *cls,
1211                       struct GNUNET_STREAM_Socket *socket)
1212 {
1213   GNUNET_assert (STATE_INIT == socket->state);
1214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1215               "%x: Attaining HELLO_WAIT state\n",
1216               socket->our_id);
1217   socket->state = STATE_HELLO_WAIT;
1218 }
1219
1220
1221 /**
1222  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1223  *
1224  * @param cls the socket (set from GNUNET_MESH_connect)
1225  * @param tunnel connection to the other end
1226  * @param tunnel_ctx this is NULL
1227  * @param sender who sent the message
1228  * @param message the actual message
1229  * @param atsi performance data for the connection
1230  * @return GNUNET_OK to keep the connection open,
1231  *         GNUNET_SYSERR to close it (signal serious error)
1232  */
1233 static int
1234 client_handle_hello_ack (void *cls,
1235                          struct GNUNET_MESH_Tunnel *tunnel,
1236                          void **tunnel_ctx,
1237                          const struct GNUNET_PeerIdentity *sender,
1238                          const struct GNUNET_MessageHeader *message,
1239                          const struct GNUNET_ATS_Information*atsi)
1240 {
1241   struct GNUNET_STREAM_Socket *socket = cls;
1242   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1243   struct GNUNET_STREAM_HelloAckMessage *reply;
1244
1245   if (GNUNET_PEER_search (sender) != socket->other_peer)
1246     {
1247       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1248                   "%x: Received HELLO_ACK from non-confirming peer\n",
1249                   socket->our_id);
1250       return GNUNET_YES;
1251     }
1252   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1254               "%x: Received HELLO_ACK from %x\n",
1255               socket->our_id,
1256               socket->other_peer);
1257
1258   GNUNET_assert (socket->tunnel == tunnel);
1259   switch (socket->state)
1260   {
1261   case STATE_HELLO_WAIT:
1262     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1263     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264                 "%x: Read sequence number %u\n",
1265                 socket->our_id,
1266                 (unsigned int) socket->read_sequence_number);
1267     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1268     /* Get the random sequence number */
1269     socket->write_sequence_number = 
1270       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1271       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272                   "%x: Generated write sequence number %u\n",
1273                   socket->our_id,
1274                   (unsigned int) socket->write_sequence_number);
1275     reply = 
1276       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1277     reply->header.header.size = 
1278       htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1279     reply->header.header.type = 
1280       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1281     reply->sequence_number = htonl (socket->write_sequence_number);
1282     reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1283     queue_message (socket, 
1284                    &reply->header, 
1285                    &set_state_established, 
1286                    NULL);      
1287     return GNUNET_OK;
1288   case STATE_ESTABLISHED:
1289   case STATE_RECEIVE_CLOSE_WAIT:
1290     // call statistics (# ACKs ignored++)
1291     return GNUNET_OK;
1292   case STATE_INIT:
1293   default:
1294     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1296                 socket->our_id,
1297                 socket->other_peer,
1298                 socket->state);
1299     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1300     return GNUNET_SYSERR;
1301   }
1302
1303 }
1304
1305
1306 /**
1307  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1308  *
1309  * @param cls the socket (set from GNUNET_MESH_connect)
1310  * @param tunnel connection to the other end
1311  * @param tunnel_ctx this is NULL
1312  * @param sender who sent the message
1313  * @param message the actual message
1314  * @param atsi performance data for the connection
1315  * @return GNUNET_OK to keep the connection open,
1316  *         GNUNET_SYSERR to close it (signal serious error)
1317  */
1318 static int
1319 client_handle_reset (void *cls,
1320                      struct GNUNET_MESH_Tunnel *tunnel,
1321                      void **tunnel_ctx,
1322                      const struct GNUNET_PeerIdentity *sender,
1323                      const struct GNUNET_MessageHeader *message,
1324                      const struct GNUNET_ATS_Information*atsi)
1325 {
1326   struct GNUNET_STREAM_Socket *socket = cls;
1327
1328   return GNUNET_OK;
1329 }
1330
1331
1332 /**
1333  * Common message handler for handling TRANSMIT_CLOSE messages
1334  *
1335  * @param socket the socket through which the ack was received
1336  * @param tunnel connection to the other end
1337  * @param sender who sent the message
1338  * @param msg the transmit close message
1339  * @param atsi performance data for the connection
1340  * @return GNUNET_OK to keep the connection open,
1341  *         GNUNET_SYSERR to close it (signal serious error)
1342  */
1343 static int
1344 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1345                        struct GNUNET_MESH_Tunnel *tunnel,
1346                        const struct GNUNET_PeerIdentity *sender,
1347                        const struct GNUNET_STREAM_MessageHeader *msg,
1348                        const struct GNUNET_ATS_Information*atsi)
1349 {
1350   struct GNUNET_STREAM_MessageHeader *reply;
1351
1352   switch (socket->state)
1353     {
1354     case STATE_ESTABLISHED:
1355       socket->state = STATE_RECEIVE_CLOSED;
1356
1357       /* Send TRANSMIT_CLOSE_ACK */
1358       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1359       reply->header.type = 
1360         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1361       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1362       queue_message (socket, reply, NULL, NULL);
1363       break;
1364
1365     default:
1366       /* FIXME: Call statistics? */
1367       break;
1368     }
1369   return GNUNET_YES;
1370 }
1371
1372
1373 /**
1374  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1375  *
1376  * @param cls the socket (set from GNUNET_MESH_connect)
1377  * @param tunnel connection to the other end
1378  * @param tunnel_ctx this is NULL
1379  * @param sender who sent the message
1380  * @param message the actual message
1381  * @param atsi performance data for the connection
1382  * @return GNUNET_OK to keep the connection open,
1383  *         GNUNET_SYSERR to close it (signal serious error)
1384  */
1385 static int
1386 client_handle_transmit_close (void *cls,
1387                               struct GNUNET_MESH_Tunnel *tunnel,
1388                               void **tunnel_ctx,
1389                               const struct GNUNET_PeerIdentity *sender,
1390                               const struct GNUNET_MessageHeader *message,
1391                               const struct GNUNET_ATS_Information*atsi)
1392 {
1393   struct GNUNET_STREAM_Socket *socket = cls;
1394   
1395   return handle_transmit_close (socket,
1396                                 tunnel,
1397                                 sender,
1398                                 (struct GNUNET_STREAM_MessageHeader *)message,
1399                                 atsi);
1400 }
1401
1402
1403 /**
1404  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1405  *
1406  * @param cls the socket (set from GNUNET_MESH_connect)
1407  * @param tunnel connection to the other end
1408  * @param tunnel_ctx this is NULL
1409  * @param sender who sent the message
1410  * @param message the actual message
1411  * @param atsi performance data for the connection
1412  * @return GNUNET_OK to keep the connection open,
1413  *         GNUNET_SYSERR to close it (signal serious error)
1414  */
1415 static int
1416 client_handle_transmit_close_ack (void *cls,
1417                                   struct GNUNET_MESH_Tunnel *tunnel,
1418                                   void **tunnel_ctx,
1419                                   const struct GNUNET_PeerIdentity *sender,
1420                                   const struct GNUNET_MessageHeader *message,
1421                                   const struct GNUNET_ATS_Information*atsi)
1422 {
1423   struct GNUNET_STREAM_Socket *socket = cls;
1424
1425   return GNUNET_OK;
1426 }
1427
1428
1429 /**
1430  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1431  *
1432  * @param cls the socket (set from GNUNET_MESH_connect)
1433  * @param tunnel connection to the other end
1434  * @param tunnel_ctx this is NULL
1435  * @param sender who sent the message
1436  * @param message the actual message
1437  * @param atsi performance data for the connection
1438  * @return GNUNET_OK to keep the connection open,
1439  *         GNUNET_SYSERR to close it (signal serious error)
1440  */
1441 static int
1442 client_handle_receive_close (void *cls,
1443                              struct GNUNET_MESH_Tunnel *tunnel,
1444                              void **tunnel_ctx,
1445                              const struct GNUNET_PeerIdentity *sender,
1446                              const struct GNUNET_MessageHeader *message,
1447                              const struct GNUNET_ATS_Information*atsi)
1448 {
1449   struct GNUNET_STREAM_Socket *socket = cls;
1450
1451   return GNUNET_OK;
1452 }
1453
1454
1455 /**
1456  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1457  *
1458  * @param cls the socket (set from GNUNET_MESH_connect)
1459  * @param tunnel connection to the other end
1460  * @param tunnel_ctx this is NULL
1461  * @param sender who sent the message
1462  * @param message the actual message
1463  * @param atsi performance data for the connection
1464  * @return GNUNET_OK to keep the connection open,
1465  *         GNUNET_SYSERR to close it (signal serious error)
1466  */
1467 static int
1468 client_handle_receive_close_ack (void *cls,
1469                                  struct GNUNET_MESH_Tunnel *tunnel,
1470                                  void **tunnel_ctx,
1471                                  const struct GNUNET_PeerIdentity *sender,
1472                                  const struct GNUNET_MessageHeader *message,
1473                                  const struct GNUNET_ATS_Information*atsi)
1474 {
1475   struct GNUNET_STREAM_Socket *socket = cls;
1476
1477   return GNUNET_OK;
1478 }
1479
1480
1481 /**
1482  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1483  *
1484  * @param cls the socket (set from GNUNET_MESH_connect)
1485  * @param tunnel connection to the other end
1486  * @param tunnel_ctx this is NULL
1487  * @param sender who sent the message
1488  * @param message the actual message
1489  * @param atsi performance data for the connection
1490  * @return GNUNET_OK to keep the connection open,
1491  *         GNUNET_SYSERR to close it (signal serious error)
1492  */
1493 static int
1494 client_handle_close (void *cls,
1495                      struct GNUNET_MESH_Tunnel *tunnel,
1496                      void **tunnel_ctx,
1497                      const struct GNUNET_PeerIdentity *sender,
1498                      const struct GNUNET_MessageHeader *message,
1499                      const struct GNUNET_ATS_Information*atsi)
1500 {
1501   struct GNUNET_STREAM_Socket *socket = cls;
1502
1503   return GNUNET_OK;
1504 }
1505
1506
1507 /**
1508  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1509  *
1510  * @param cls the socket (set from GNUNET_MESH_connect)
1511  * @param tunnel connection to the other end
1512  * @param tunnel_ctx this is NULL
1513  * @param sender who sent the message
1514  * @param message the actual message
1515  * @param atsi performance data for the connection
1516  * @return GNUNET_OK to keep the connection open,
1517  *         GNUNET_SYSERR to close it (signal serious error)
1518  */
1519 static int
1520 client_handle_close_ack (void *cls,
1521                          struct GNUNET_MESH_Tunnel *tunnel,
1522                          void **tunnel_ctx,
1523                          const struct GNUNET_PeerIdentity *sender,
1524                          const struct GNUNET_MessageHeader *message,
1525                          const struct GNUNET_ATS_Information*atsi)
1526 {
1527   struct GNUNET_STREAM_Socket *socket = cls;
1528
1529   return GNUNET_OK;
1530 }
1531
1532 /*****************************/
1533 /* Server's Message Handlers */
1534 /*****************************/
1535
1536 /**
1537  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1538  *
1539  * @param cls the closure
1540  * @param tunnel connection to the other end
1541  * @param tunnel_ctx the socket
1542  * @param sender who sent the message
1543  * @param message the actual message
1544  * @param atsi performance data for the connection
1545  * @return GNUNET_OK to keep the connection open,
1546  *         GNUNET_SYSERR to close it (signal serious error)
1547  */
1548 static int
1549 server_handle_data (void *cls,
1550                     struct GNUNET_MESH_Tunnel *tunnel,
1551                     void **tunnel_ctx,
1552                     const struct GNUNET_PeerIdentity *sender,
1553                     const struct GNUNET_MessageHeader *message,
1554                     const struct GNUNET_ATS_Information*atsi)
1555 {
1556   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1557
1558   return handle_data (socket,
1559                       tunnel,
1560                       sender,
1561                       (const struct GNUNET_STREAM_DataMessage *)message,
1562                       atsi);
1563 }
1564
1565
1566 /**
1567  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1568  *
1569  * @param cls the closure
1570  * @param tunnel connection to the other end
1571  * @param tunnel_ctx the socket
1572  * @param sender who sent the message
1573  * @param message the actual message
1574  * @param atsi performance data for the connection
1575  * @return GNUNET_OK to keep the connection open,
1576  *         GNUNET_SYSERR to close it (signal serious error)
1577  */
1578 static int
1579 server_handle_hello (void *cls,
1580                      struct GNUNET_MESH_Tunnel *tunnel,
1581                      void **tunnel_ctx,
1582                      const struct GNUNET_PeerIdentity *sender,
1583                      const struct GNUNET_MessageHeader *message,
1584                      const struct GNUNET_ATS_Information*atsi)
1585 {
1586   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1587   struct GNUNET_STREAM_HelloAckMessage *reply;
1588
1589   if (GNUNET_PEER_search (sender) != socket->other_peer)
1590     {
1591       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1592                   "%x: Received HELLO from non-confirming peer\n",
1593                   socket->our_id);
1594       return GNUNET_YES;
1595     }
1596
1597   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1598                  ntohs (message->type));
1599   GNUNET_assert (socket->tunnel == tunnel);
1600   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601               "%x: Received HELLO from %x\n", 
1602               socket->our_id,
1603               socket->other_peer);
1604
1605   if (STATE_INIT == socket->state)
1606     {
1607       /* Get the random sequence number */
1608       socket->write_sequence_number = 
1609         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1610       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1611                   "%x: Generated write sequence number %u\n",
1612                   socket->our_id,
1613                   (unsigned int) socket->write_sequence_number);
1614       reply = 
1615         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1616       reply->header.header.size = 
1617         htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1618       reply->header.header.type = 
1619         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1620       reply->sequence_number = htonl (socket->write_sequence_number);
1621       reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1622       queue_message (socket, 
1623                      &reply->header,
1624                      &set_state_hello_wait, 
1625                      NULL);
1626     }
1627   else
1628     {
1629       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1630                   "Client sent HELLO when in state %d\n", socket->state);
1631       /* FIXME: Send RESET? */
1632       
1633     }
1634   return GNUNET_OK;
1635 }
1636
1637
1638 /**
1639  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1640  *
1641  * @param cls the closure
1642  * @param tunnel connection to the other end
1643  * @param tunnel_ctx the socket
1644  * @param sender who sent the message
1645  * @param message the actual message
1646  * @param atsi performance data for the connection
1647  * @return GNUNET_OK to keep the connection open,
1648  *         GNUNET_SYSERR to close it (signal serious error)
1649  */
1650 static int
1651 server_handle_hello_ack (void *cls,
1652                          struct GNUNET_MESH_Tunnel *tunnel,
1653                          void **tunnel_ctx,
1654                          const struct GNUNET_PeerIdentity *sender,
1655                          const struct GNUNET_MessageHeader *message,
1656                          const struct GNUNET_ATS_Information*atsi)
1657 {
1658   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1659   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1660
1661   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1662                  ntohs (message->type));
1663   GNUNET_assert (socket->tunnel == tunnel);
1664   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1665   if (STATE_HELLO_WAIT == socket->state)
1666     {
1667       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1668                   "%x: Received HELLO_ACK from %x\n",
1669                   socket->our_id,
1670                   socket->other_peer);
1671       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1672       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673                   "%x: Read sequence number %u\n",
1674                   socket->our_id,
1675                   (unsigned int) socket->read_sequence_number);
1676       socket->receiver_window_available = 
1677         ntohl (ack_message->receiver_window_size);
1678       /* Attain ESTABLISHED state */
1679       set_state_established (NULL, socket);
1680     }
1681   else
1682     {
1683       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1684                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1685       /* FIXME: Send RESET? */
1686       
1687     }
1688   return GNUNET_OK;
1689 }
1690
1691
1692 /**
1693  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1694  *
1695  * @param cls the closure
1696  * @param tunnel connection to the other end
1697  * @param tunnel_ctx the socket
1698  * @param sender who sent the message
1699  * @param message the actual message
1700  * @param atsi performance data for the connection
1701  * @return GNUNET_OK to keep the connection open,
1702  *         GNUNET_SYSERR to close it (signal serious error)
1703  */
1704 static int
1705 server_handle_reset (void *cls,
1706                      struct GNUNET_MESH_Tunnel *tunnel,
1707                      void **tunnel_ctx,
1708                      const struct GNUNET_PeerIdentity *sender,
1709                      const struct GNUNET_MessageHeader *message,
1710                      const struct GNUNET_ATS_Information*atsi)
1711 {
1712   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1713
1714   return GNUNET_OK;
1715 }
1716
1717
1718 /**
1719  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1720  *
1721  * @param cls the closure
1722  * @param tunnel connection to the other end
1723  * @param tunnel_ctx the socket
1724  * @param sender who sent the message
1725  * @param message the actual message
1726  * @param atsi performance data for the connection
1727  * @return GNUNET_OK to keep the connection open,
1728  *         GNUNET_SYSERR to close it (signal serious error)
1729  */
1730 static int
1731 server_handle_transmit_close (void *cls,
1732                               struct GNUNET_MESH_Tunnel *tunnel,
1733                               void **tunnel_ctx,
1734                               const struct GNUNET_PeerIdentity *sender,
1735                               const struct GNUNET_MessageHeader *message,
1736                               const struct GNUNET_ATS_Information*atsi)
1737 {
1738   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1739
1740   return handle_transmit_close (socket,
1741                                 tunnel,
1742                                 sender,
1743                                 (struct GNUNET_STREAM_MessageHeader *)message,
1744                                 atsi);
1745 }
1746
1747
1748 /**
1749  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1750  *
1751  * @param cls the closure
1752  * @param tunnel connection to the other end
1753  * @param tunnel_ctx the socket
1754  * @param sender who sent the message
1755  * @param message the actual message
1756  * @param atsi performance data for the connection
1757  * @return GNUNET_OK to keep the connection open,
1758  *         GNUNET_SYSERR to close it (signal serious error)
1759  */
1760 static int
1761 server_handle_transmit_close_ack (void *cls,
1762                                   struct GNUNET_MESH_Tunnel *tunnel,
1763                                   void **tunnel_ctx,
1764                                   const struct GNUNET_PeerIdentity *sender,
1765                                   const struct GNUNET_MessageHeader *message,
1766                                   const struct GNUNET_ATS_Information*atsi)
1767 {
1768   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1769
1770   return GNUNET_OK;
1771 }
1772
1773
1774 /**
1775  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1776  *
1777  * @param cls the closure
1778  * @param tunnel connection to the other end
1779  * @param tunnel_ctx the socket
1780  * @param sender who sent the message
1781  * @param message the actual message
1782  * @param atsi performance data for the connection
1783  * @return GNUNET_OK to keep the connection open,
1784  *         GNUNET_SYSERR to close it (signal serious error)
1785  */
1786 static int
1787 server_handle_receive_close (void *cls,
1788                              struct GNUNET_MESH_Tunnel *tunnel,
1789                              void **tunnel_ctx,
1790                              const struct GNUNET_PeerIdentity *sender,
1791                              const struct GNUNET_MessageHeader *message,
1792                              const struct GNUNET_ATS_Information*atsi)
1793 {
1794   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1795
1796   return GNUNET_OK;
1797 }
1798
1799
1800 /**
1801  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1802  *
1803  * @param cls the closure
1804  * @param tunnel connection to the other end
1805  * @param tunnel_ctx the socket
1806  * @param sender who sent the message
1807  * @param message the actual message
1808  * @param atsi performance data for the connection
1809  * @return GNUNET_OK to keep the connection open,
1810  *         GNUNET_SYSERR to close it (signal serious error)
1811  */
1812 static int
1813 server_handle_receive_close_ack (void *cls,
1814                                  struct GNUNET_MESH_Tunnel *tunnel,
1815                                  void **tunnel_ctx,
1816                                  const struct GNUNET_PeerIdentity *sender,
1817                                  const struct GNUNET_MessageHeader *message,
1818                                  const struct GNUNET_ATS_Information*atsi)
1819 {
1820   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1821
1822   return GNUNET_OK;
1823 }
1824
1825
1826 /**
1827  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1828  *
1829  * @param cls the closure
1830  * @param tunnel connection to the other end
1831  * @param tunnel_ctx the socket
1832  * @param sender who sent the message
1833  * @param message the actual message
1834  * @param atsi performance data for the connection
1835  * @return GNUNET_OK to keep the connection open,
1836  *         GNUNET_SYSERR to close it (signal serious error)
1837  */
1838 static int
1839 server_handle_close (void *cls,
1840                      struct GNUNET_MESH_Tunnel *tunnel,
1841                      void **tunnel_ctx,
1842                      const struct GNUNET_PeerIdentity *sender,
1843                      const struct GNUNET_MessageHeader *message,
1844                      const struct GNUNET_ATS_Information*atsi)
1845 {
1846   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1847
1848   return GNUNET_OK;
1849 }
1850
1851
1852 /**
1853  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1854  *
1855  * @param cls the closure
1856  * @param tunnel connection to the other end
1857  * @param tunnel_ctx the socket
1858  * @param sender who sent the message
1859  * @param message the actual message
1860  * @param atsi performance data for the connection
1861  * @return GNUNET_OK to keep the connection open,
1862  *         GNUNET_SYSERR to close it (signal serious error)
1863  */
1864 static int
1865 server_handle_close_ack (void *cls,
1866                          struct GNUNET_MESH_Tunnel *tunnel,
1867                          void **tunnel_ctx,
1868                          const struct GNUNET_PeerIdentity *sender,
1869                          const struct GNUNET_MessageHeader *message,
1870                          const struct GNUNET_ATS_Information*atsi)
1871 {
1872   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1873
1874   return GNUNET_OK;
1875 }
1876
1877
1878 /**
1879  * Message Handler for mesh
1880  *
1881  * @param socket the socket through which the ack was received
1882  * @param tunnel connection to the other end
1883  * @param sender who sent the message
1884  * @param ack the acknowledgment message
1885  * @param atsi performance data for the connection
1886  * @return GNUNET_OK to keep the connection open,
1887  *         GNUNET_SYSERR to close it (signal serious error)
1888  */
1889 static int
1890 handle_ack (struct GNUNET_STREAM_Socket *socket,
1891             struct GNUNET_MESH_Tunnel *tunnel,
1892             const struct GNUNET_PeerIdentity *sender,
1893             const struct GNUNET_STREAM_AckMessage *ack,
1894             const struct GNUNET_ATS_Information*atsi)
1895 {
1896   unsigned int packet;
1897   int need_retransmission;
1898   
1899
1900   if (GNUNET_PEER_search (sender) != socket->other_peer)
1901     {
1902       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1903                   "%x: Received ACK from non-confirming peer\n",
1904                   socket->our_id);
1905       return GNUNET_YES;
1906     }
1907
1908   switch (socket->state)
1909     {
1910     case (STATE_ESTABLISHED):
1911       if (NULL == socket->write_handle)
1912         {
1913           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914                       "%x: Received DATA_ACK when write_handle is NULL\n",
1915                       socket->our_id);
1916           return GNUNET_OK;
1917         }
1918       /* FIXME: increment in the base sequence number is breaking current flow
1919        */
1920       if (!((socket->write_sequence_number 
1921              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1922         {
1923           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1924                       "%x: Received DATA_ACK with unexpected base sequence "
1925                       "number\n",
1926                       socket->our_id);
1927           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
1929                       socket->our_id,
1930                       socket->write_sequence_number,
1931                       ntohl (ack->base_sequence_number));
1932           return GNUNET_OK;
1933         }
1934       /* FIXME: include the case when write_handle is cancelled - ignore the 
1935          acks */
1936
1937       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1938                   "%x: Received DATA_ACK from %x\n",
1939                   socket->our_id,
1940                   socket->other_peer);
1941       
1942       /* Cancel the retransmission task */
1943       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1944         {
1945           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1946           socket->retransmission_timeout_task_id = 
1947             GNUNET_SCHEDULER_NO_TASK;
1948         }
1949
1950       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1951         {
1952           if (NULL == socket->write_handle->messages[packet]) break;
1953           if (ntohl (ack->base_sequence_number)
1954               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
1955             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1956                                   packet,
1957                                   GNUNET_YES);
1958           else
1959             if (GNUNET_YES == 
1960                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
1961                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
1962                                       - ntohl (ack->base_sequence_number)))
1963               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1964                                     packet,
1965                                     GNUNET_YES);
1966         }
1967
1968       /* Update the receive window remaining
1969        FIXME : Should update with the value from a data ack with greater
1970        sequence number */
1971       socket->receiver_window_available = 
1972         ntohl (ack->receive_window_remaining);
1973
1974       /* Check if we have received all acknowledgements */
1975       need_retransmission = GNUNET_NO;
1976       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1977         {
1978           if (NULL == socket->write_handle->messages[packet]) break;
1979           if (GNUNET_YES != ackbitmap_is_bit_set 
1980               (&socket->write_handle->ack_bitmap,packet))
1981             {
1982               need_retransmission = GNUNET_YES;
1983               break;
1984             }
1985         }
1986       if (GNUNET_YES == need_retransmission)
1987         {
1988           write_data (socket);
1989         }
1990       else      /* We have to call the write continuation callback now */
1991         {
1992           /* Free the packets */
1993           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1994             {
1995               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1996             }
1997           if (NULL != socket->write_handle->write_cont)
1998             socket->write_handle->write_cont
1999               (socket->write_handle->write_cont_cls,
2000                socket->status,
2001                socket->write_handle->size);
2002           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2003                       "%x: Write completion callback completed\n",
2004                       socket->our_id);
2005           /* We are done with the write handle - Freeing it */
2006           GNUNET_free (socket->write_handle);
2007           socket->write_handle = NULL;
2008         }
2009       break;
2010     default:
2011       break;
2012     }
2013   return GNUNET_OK;
2014 }
2015
2016
2017 /**
2018  * Message Handler for mesh
2019  *
2020  * @param cls the 'struct GNUNET_STREAM_Socket'
2021  * @param tunnel connection to the other end
2022  * @param tunnel_ctx unused
2023  * @param sender who sent the message
2024  * @param message the actual message
2025  * @param atsi performance data for the connection
2026  * @return GNUNET_OK to keep the connection open,
2027  *         GNUNET_SYSERR to close it (signal serious error)
2028  */
2029 static int
2030 client_handle_ack (void *cls,
2031                    struct GNUNET_MESH_Tunnel *tunnel,
2032                    void **tunnel_ctx,
2033                    const struct GNUNET_PeerIdentity *sender,
2034                    const struct GNUNET_MessageHeader *message,
2035                    const struct GNUNET_ATS_Information*atsi)
2036 {
2037   struct GNUNET_STREAM_Socket *socket = cls;
2038   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2039  
2040   return handle_ack (socket, tunnel, sender, ack, atsi);
2041 }
2042
2043
2044 /**
2045  * Message Handler for mesh
2046  *
2047  * @param cls the server's listen socket
2048  * @param tunnel connection to the other end
2049  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2050  * @param sender who sent the message
2051  * @param message the actual message
2052  * @param atsi performance data for the connection
2053  * @return GNUNET_OK to keep the connection open,
2054  *         GNUNET_SYSERR to close it (signal serious error)
2055  */
2056 static int
2057 server_handle_ack (void *cls,
2058                    struct GNUNET_MESH_Tunnel *tunnel,
2059                    void **tunnel_ctx,
2060                    const struct GNUNET_PeerIdentity *sender,
2061                    const struct GNUNET_MessageHeader *message,
2062                    const struct GNUNET_ATS_Information*atsi)
2063 {
2064   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2065   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2066  
2067   return handle_ack (socket, tunnel, sender, ack, atsi);
2068 }
2069
2070
2071 /**
2072  * For client message handlers, the stream socket is in the
2073  * closure argument.
2074  */
2075 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2076   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2077   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2078    sizeof (struct GNUNET_STREAM_AckMessage) },
2079   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2080    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2081   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2082    sizeof (struct GNUNET_STREAM_MessageHeader)},
2083   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2084    sizeof (struct GNUNET_STREAM_MessageHeader)},
2085   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2086    sizeof (struct GNUNET_STREAM_MessageHeader)},
2087   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2088    sizeof (struct GNUNET_STREAM_MessageHeader)},
2089   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2090    sizeof (struct GNUNET_STREAM_MessageHeader)},
2091   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2092    sizeof (struct GNUNET_STREAM_MessageHeader)},
2093   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2094    sizeof (struct GNUNET_STREAM_MessageHeader)},
2095   {NULL, 0, 0}
2096 };
2097
2098
2099 /**
2100  * For server message handlers, the stream socket is in the
2101  * tunnel context, and the listen socket in the closure argument.
2102  */
2103 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2104   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2105   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2106    sizeof (struct GNUNET_STREAM_AckMessage) },
2107   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2108    sizeof (struct GNUNET_STREAM_MessageHeader)},
2109   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2110    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2111   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2112    sizeof (struct GNUNET_STREAM_MessageHeader)},
2113   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2114    sizeof (struct GNUNET_STREAM_MessageHeader)},
2115   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2116    sizeof (struct GNUNET_STREAM_MessageHeader)},
2117   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2118    sizeof (struct GNUNET_STREAM_MessageHeader)},
2119   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2120    sizeof (struct GNUNET_STREAM_MessageHeader)},
2121   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2122    sizeof (struct GNUNET_STREAM_MessageHeader)},
2123   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2124    sizeof (struct GNUNET_STREAM_MessageHeader)},
2125   {NULL, 0, 0}
2126 };
2127
2128
2129 /**
2130  * Function called when our target peer is connected to our tunnel
2131  *
2132  * @param cls the socket for which this tunnel is created
2133  * @param peer the peer identity of the target
2134  * @param atsi performance data for the connection
2135  */
2136 static void
2137 mesh_peer_connect_callback (void *cls,
2138                             const struct GNUNET_PeerIdentity *peer,
2139                             const struct GNUNET_ATS_Information * atsi)
2140 {
2141   struct GNUNET_STREAM_Socket *socket = cls;
2142   struct GNUNET_STREAM_MessageHeader *message;
2143   GNUNET_PEER_Id connected_peer;
2144
2145   connected_peer = GNUNET_PEER_search (peer);
2146   
2147   if (connected_peer != socket->other_peer)
2148     {
2149       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2150                   "%x: A peer which is not our target has connected",
2151                   "to our tunnel\n",
2152                   socket->our_id);
2153       return;
2154     }
2155   
2156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2157               "%x: Target peer %x connected\n", 
2158               socket->our_id,
2159               connected_peer);
2160   
2161   /* Set state to INIT */
2162   socket->state = STATE_INIT;
2163
2164   /* Send HELLO message */
2165   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2166   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2167   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2168   queue_message (socket,
2169                  message,
2170                  &set_state_hello_wait,
2171                  NULL);
2172
2173   /* Call open callback */
2174   if (NULL == socket->open_cb)
2175     {
2176       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2177                   "STREAM_open callback is NULL\n");
2178     }
2179 }
2180
2181
2182 /**
2183  * Function called when our target peer is disconnected from our tunnel
2184  *
2185  * @param cls the socket associated which this tunnel
2186  * @param peer the peer identity of the target
2187  */
2188 static void
2189 mesh_peer_disconnect_callback (void *cls,
2190                                const struct GNUNET_PeerIdentity *peer)
2191 {
2192
2193 }
2194
2195
2196 /**
2197  * Method called whenever a peer creates a tunnel to us
2198  *
2199  * @param cls closure
2200  * @param tunnel new handle to the tunnel
2201  * @param initiator peer that started the tunnel
2202  * @param atsi performance information for the tunnel
2203  * @return initial tunnel context for the tunnel
2204  *         (can be NULL -- that's not an error)
2205  */
2206 static void *
2207 new_tunnel_notify (void *cls,
2208                    struct GNUNET_MESH_Tunnel *tunnel,
2209                    const struct GNUNET_PeerIdentity *initiator,
2210                    const struct GNUNET_ATS_Information *atsi)
2211 {
2212   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2213   struct GNUNET_STREAM_Socket *socket;
2214
2215   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2216      from the same peer again until the socket is closed */
2217
2218   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2219   socket->other_peer = GNUNET_PEER_intern (initiator);
2220   socket->tunnel = tunnel;
2221   socket->session_id = 0;       /* FIXME */
2222   socket->state = STATE_INIT;
2223   socket->lsocket = lsocket;
2224   socket->our_id = lsocket->our_id;
2225   
2226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2227               "%x: Peer %x initiated tunnel to us\n", 
2228               socket->our_id,
2229               socket->other_peer);
2230   
2231   /* FIXME: Copy MESH handle from lsocket to socket */
2232   
2233   return socket;
2234 }
2235
2236
2237 /**
2238  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2239  * any associated state.  This function is NOT called if the client has
2240  * explicitly asked for the tunnel to be destroyed using
2241  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2242  * the tunnel.
2243  *
2244  * @param cls closure (set from GNUNET_MESH_connect)
2245  * @param tunnel connection to the other end (henceforth invalid)
2246  * @param tunnel_ctx place where local state associated
2247  *                   with the tunnel is stored
2248  */
2249 static void 
2250 tunnel_cleaner (void *cls,
2251                 const struct GNUNET_MESH_Tunnel *tunnel,
2252                 void *tunnel_ctx)
2253 {
2254   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2255   
2256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2257               "%x: Peer %x has terminated connection abruptly\n",
2258               socket->our_id,
2259               socket->other_peer);
2260
2261   socket->status = GNUNET_STREAM_SHUTDOWN;
2262
2263   /* Clear Transmit handles */
2264   if (NULL != socket->transmit_handle)
2265     {
2266       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2267       socket->transmit_handle = NULL;
2268     }
2269   socket->tunnel = NULL;
2270 }
2271
2272
2273 /*****************/
2274 /* API functions */
2275 /*****************/
2276
2277
2278 /**
2279  * Tries to open a stream to the target peer
2280  *
2281  * @param cfg configuration to use
2282  * @param target the target peer to which the stream has to be opened
2283  * @param app_port the application port number which uniquely identifies this
2284  *            stream
2285  * @param open_cb this function will be called after stream has be established 
2286  * @param open_cb_cls the closure for open_cb
2287  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2288  * @return if successful it returns the stream socket; NULL if stream cannot be
2289  *         opened 
2290  */
2291 struct GNUNET_STREAM_Socket *
2292 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2293                     const struct GNUNET_PeerIdentity *target,
2294                     GNUNET_MESH_ApplicationType app_port,
2295                     GNUNET_STREAM_OpenCallback open_cb,
2296                     void *open_cb_cls,
2297                     ...)
2298 {
2299   struct GNUNET_STREAM_Socket *socket;
2300   struct GNUNET_PeerIdentity own_peer_id;
2301   enum GNUNET_STREAM_Option option;
2302   va_list vargs;                /* Variable arguments */
2303
2304   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2305               "%s\n", __func__);
2306
2307   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2308   socket->other_peer = GNUNET_PEER_intern (target);
2309   socket->open_cb = open_cb;
2310   socket->open_cls = open_cb_cls;
2311   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2312   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2313   
2314   /* Set defaults */
2315   socket->retransmit_timeout = 
2316     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2317
2318   va_start (vargs, open_cb_cls); /* Parse variable args */
2319   do {
2320     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2321     switch (option)
2322       {
2323       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2324         /* Expect struct GNUNET_TIME_Relative */
2325         socket->retransmit_timeout = va_arg (vargs,
2326                                              struct GNUNET_TIME_Relative);
2327         break;
2328       case GNUNET_STREAM_OPTION_END:
2329         break;
2330       }
2331   } while (GNUNET_STREAM_OPTION_END != option);
2332   va_end (vargs);               /* End of variable args parsing */
2333   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2334                                       10,  /* QUEUE size as parameter? */
2335                                       socket, /* cls */
2336                                       NULL, /* No inbound tunnel handler */
2337                                       &tunnel_cleaner, /* FIXME: not required? */
2338                                       client_message_handlers,
2339                                       &app_port); /* We don't get inbound tunnels */
2340   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2341     {
2342       GNUNET_free (socket);
2343       return NULL;
2344     }
2345
2346   /* Now create the mesh tunnel to target */
2347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2348               "Creating MESH Tunnel\n");
2349   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2350                                               NULL, /* Tunnel context */
2351                                               &mesh_peer_connect_callback,
2352                                               &mesh_peer_disconnect_callback,
2353                                               socket);
2354   GNUNET_assert (NULL != socket->tunnel);
2355   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2356                                         target);
2357   
2358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2359               "%s() END\n", __func__);
2360   return socket;
2361 }
2362
2363
2364 /**
2365  * Shutdown the stream for reading or writing (man 2 shutdown).
2366  *
2367  * @param socket the stream socket
2368  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2369  */
2370 void
2371 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2372                         int how)
2373 {
2374   return;
2375 }
2376
2377
2378 /**
2379  * Closes the stream
2380  *
2381  * @param socket the stream socket
2382  */
2383 void
2384 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2385 {
2386   struct MessageQueue *head;
2387
2388   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2389     {
2390       /* socket closed with read task pending!? */
2391       GNUNET_break (0);
2392       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2393       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2394     }
2395   
2396   /* Terminate the ack'ing tasks if they are still present */
2397   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2398     {
2399       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2400       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2401     }
2402
2403   /* Clear Transmit handles */
2404   if (NULL != socket->transmit_handle)
2405     {
2406       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2407       socket->transmit_handle = NULL;
2408     }
2409
2410   /* Clear existing message queue */
2411   while (NULL != (head = socket->queue_head)) {
2412     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2413                                  socket->queue_tail,
2414                                  head);
2415     GNUNET_free (head->message);
2416     GNUNET_free (head);
2417   }
2418
2419   /* Close associated tunnel */
2420   if (NULL != socket->tunnel)
2421     {
2422       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2423       socket->tunnel = NULL;
2424     }
2425
2426   /* Close mesh connection */
2427   if (NULL != socket->mesh && NULL == socket->lsocket)
2428     {
2429       GNUNET_MESH_disconnect (socket->mesh);
2430       socket->mesh = NULL;
2431     }
2432   
2433   /* Release receive buffer */
2434   if (NULL != socket->receive_buffer)
2435     {
2436       GNUNET_free (socket->receive_buffer);
2437     }
2438
2439   GNUNET_free (socket);
2440 }
2441
2442
2443 /**
2444  * Listens for stream connections for a specific application ports
2445  *
2446  * @param cfg the configuration to use
2447  * @param app_port the application port for which new streams will be accepted
2448  * @param listen_cb this function will be called when a peer tries to establish
2449  *            a stream with us
2450  * @param listen_cb_cls closure for listen_cb
2451  * @return listen socket, NULL for any error
2452  */
2453 struct GNUNET_STREAM_ListenSocket *
2454 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2455                       GNUNET_MESH_ApplicationType app_port,
2456                       GNUNET_STREAM_ListenCallback listen_cb,
2457                       void *listen_cb_cls)
2458 {
2459   /* FIXME: Add variable args for passing configration options? */
2460   struct GNUNET_STREAM_ListenSocket *lsocket;
2461   struct GNUNET_PeerIdentity our_peer_id;
2462
2463   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2464   lsocket->port = app_port;
2465   lsocket->listen_cb = listen_cb;
2466   lsocket->listen_cb_cls = listen_cb_cls;
2467   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2468   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2469   lsocket->mesh = GNUNET_MESH_connect (cfg,
2470                                        10, /* FIXME: QUEUE size as parameter? */
2471                                        lsocket, /* Closure */
2472                                        &new_tunnel_notify,
2473                                        &tunnel_cleaner,
2474                                        server_message_handlers,
2475                                        &app_port);
2476   GNUNET_assert (NULL != lsocket->mesh);
2477   return lsocket;
2478 }
2479
2480
2481 /**
2482  * Closes the listen socket
2483  *
2484  * @param lsocket the listen socket
2485  */
2486 void
2487 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2488 {
2489   /* Close MESH connection */
2490   GNUNET_assert (NULL != lsocket->mesh);
2491   GNUNET_MESH_disconnect (lsocket->mesh);
2492   
2493   GNUNET_free (lsocket);
2494 }
2495
2496
2497 /**
2498  * Tries to write the given data to the stream
2499  *
2500  * @param socket the socket representing a stream
2501  * @param data the data buffer from where the data is written into the stream
2502  * @param size the number of bytes to be written from the data buffer
2503  * @param timeout the timeout period
2504  * @param write_cont the function to call upon writing some bytes into the stream
2505  * @param write_cont_cls the closure
2506  * @return handle to cancel the operation
2507  */
2508 struct GNUNET_STREAM_IOWriteHandle *
2509 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2510                      const void *data,
2511                      size_t size,
2512                      struct GNUNET_TIME_Relative timeout,
2513                      GNUNET_STREAM_CompletionContinuation write_cont,
2514                      void *write_cont_cls)
2515 {
2516   unsigned int num_needed_packets;
2517   unsigned int packet;
2518   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2519   uint32_t packet_size;
2520   uint32_t payload_size;
2521   struct GNUNET_STREAM_DataMessage *data_msg;
2522   const void *sweep;
2523   struct GNUNET_TIME_Relative ack_deadline;
2524
2525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526               "%s\n", __func__);
2527
2528   /* Return NULL if there is already a write request pending */
2529   if (NULL != socket->write_handle)
2530   {
2531     GNUNET_break (0);
2532     return NULL;
2533   }
2534   if (!((STATE_ESTABLISHED == socket->state)
2535         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2536         || (STATE_RECEIVE_CLOSED == socket->state)))
2537     {
2538       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2539                   "%x: Attempting to write on a closed (OR) not-yet-established"
2540                   "stream\n",
2541                   socket->our_id);
2542       return NULL;
2543     } 
2544   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2545     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2546   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2547   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2548   io_handle->write_cont = write_cont;
2549   io_handle->write_cont_cls = write_cont_cls;
2550   io_handle->size = size;
2551   sweep = data;
2552   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2553      determined from RTT */
2554   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2555   /* Divide the given buffer into packets for sending */
2556   for (packet=0; packet < num_needed_packets; packet++)
2557     {
2558       if ((packet + 1) * max_payload_size < size) 
2559         {
2560           payload_size = max_payload_size;
2561           packet_size = MAX_PACKET_SIZE;
2562         }
2563       else 
2564         {
2565           payload_size = size - packet * max_payload_size;
2566           packet_size =  payload_size + sizeof (struct
2567                                                 GNUNET_STREAM_DataMessage); 
2568         }
2569       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2570       io_handle->messages[packet]->header.header.size = htons (packet_size);
2571       io_handle->messages[packet]->header.header.type =
2572         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2573       io_handle->messages[packet]->sequence_number =
2574         htonl (socket->write_sequence_number++);
2575       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2576
2577       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2578          determined from RTT */
2579       io_handle->messages[packet]->ack_deadline =
2580         GNUNET_TIME_relative_hton (ack_deadline);
2581       data_msg = io_handle->messages[packet];
2582       /* Copy data from given buffer to the packet */
2583       memcpy (&data_msg[1],
2584               sweep,
2585               payload_size);
2586       sweep += payload_size;
2587       socket->write_offset += payload_size;
2588     }
2589   socket->write_handle = io_handle;
2590   write_data (socket);
2591
2592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2593               "%s() END\n", __func__);
2594
2595   return io_handle;
2596 }
2597
2598
2599 /**
2600  * Tries to read data from the stream
2601  *
2602  * @param socket the socket representing a stream
2603  * @param timeout the timeout period
2604  * @param proc function to call with data (once only)
2605  * @param proc_cls the closure for proc
2606  * @return handle to cancel the operation
2607  */
2608 struct GNUNET_STREAM_IOReadHandle *
2609 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2610                     struct GNUNET_TIME_Relative timeout,
2611                     GNUNET_STREAM_DataProcessor proc,
2612                     void *proc_cls)
2613 {
2614   struct GNUNET_STREAM_IOReadHandle *read_handle;
2615   
2616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2617               "%x: %s()\n", 
2618               socket->our_id,
2619               __func__);
2620
2621   /* Return NULL if there is already a read handle; the user has to cancel that
2622   first before continuing or has to wait until it is completed */
2623   if (NULL != socket->read_handle) return NULL;
2624
2625   GNUNET_assert (NULL != proc);
2626
2627   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2628   read_handle->proc = proc;
2629   read_handle->proc_cls = proc_cls;
2630   socket->read_handle = read_handle;
2631
2632   /* Check if we have a packet at bitmap 0 */
2633   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2634                                           0))
2635     {
2636       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2637                                                        socket);
2638    
2639     }
2640   
2641   /* Setup the read timeout task */
2642   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2643                                                                &read_io_timeout,
2644                                                                socket);
2645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2646               "%x: %s() END\n",
2647               socket->our_id,
2648               __func__);
2649   return read_handle;
2650 }
2651
2652
2653 /**
2654  * Cancel pending write operation.
2655  *
2656  * @param ioh handle to operation to cancel
2657  */
2658 void
2659 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2660 {
2661   /* FIXME: Should cancel the write retransmission task */
2662   return;
2663 }
2664
2665
2666 /**
2667  * Cancel pending read operation.
2668  *
2669  * @param ioh handle to operation to cancel
2670  */
2671 void
2672 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2673 {
2674   return;
2675 }