fixed read packets removal after read processor,
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * The listen socket from which this socket is derived. Should be NULL if it
235    * is not a derived socket
236    */
237   struct GNUNET_STREAM_ListenSocket *lsocket;
238
239   /**
240    * Task identifier for the read io timeout task
241    */
242   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
243
244   /**
245    * Task identifier for retransmission task after timeout
246    */
247   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
248
249   /**
250    * The task for sending timely Acks
251    */
252   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
253
254   /**
255    * Task scheduled to continue a read operation.
256    */
257   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
258
259   /**
260    * The state of the protocol associated with this socket
261    */
262   enum State state;
263
264   /**
265    * The status of the socket
266    */
267   enum GNUNET_STREAM_Status status;
268
269   /**
270    * The number of previous timeouts; FIXME: currently not used
271    */
272   unsigned int retries;
273
274   /**
275    * The peer identity of the peer at the other end of the stream
276    */
277   GNUNET_PEER_Id other_peer;
278
279   /**
280    * Our Peer Identity (for debugging)
281    */
282   GNUNET_PEER_Id our_id;
283
284   /**
285    * The application port number (type: uint32_t)
286    */
287   GNUNET_MESH_ApplicationType app_port;
288
289   /**
290    * The session id associated with this stream connection
291    * FIXME: Not used currently, may be removed
292    */
293   uint32_t session_id;
294
295   /**
296    * Write sequence number. Set to random when sending HELLO(client) and
297    * HELLO_ACK(server) 
298    */
299   uint32_t write_sequence_number;
300
301   /**
302    * Read sequence number. This number's value is determined during handshake
303    */
304   uint32_t read_sequence_number;
305
306   /**
307    * The receiver buffer size
308    */
309   uint32_t receive_buffer_size;
310
311   /**
312    * The receiver buffer boundaries
313    */
314   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
315
316   /**
317    * receiver's available buffer after the last acknowledged packet
318    */
319   uint32_t receiver_window_available;
320
321   /**
322    * The offset pointer used during write operation
323    */
324   uint32_t write_offset;
325
326   /**
327    * The offset after which we are expecting data
328    */
329   uint32_t read_offset;
330
331   /**
332    * The offset upto which user has read from the received buffer
333    */
334   uint32_t copy_offset;
335 };
336
337
338 /**
339  * A socket for listening
340  */
341 struct GNUNET_STREAM_ListenSocket
342 {
343   /**
344    * The mesh handle
345    */
346   struct GNUNET_MESH_Handle *mesh;
347
348   /**
349    * The callback function which is called after successful opening socket
350    */
351   GNUNET_STREAM_ListenCallback listen_cb;
352
353   /**
354    * The call back closure
355    */
356   void *listen_cb_cls;
357
358   /**
359    * Our interned Peer's identity
360    */
361   GNUNET_PEER_Id our_id;
362
363   /**
364    * The service port
365    * FIXME: Remove if not required!
366    */
367   GNUNET_MESH_ApplicationType port;
368 };
369
370
371 /**
372  * The IO Write Handle
373  */
374 struct GNUNET_STREAM_IOWriteHandle
375 {
376   /**
377    * The packet_buffers associated with this Handle
378    */
379   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
380
381   /**
382    * The write continuation callback
383    */
384   GNUNET_STREAM_CompletionContinuation write_cont;
385
386   /**
387    * Write continuation closure
388    */
389   void *write_cont_cls;
390
391   /**
392    * The bitmap of this IOHandle; Corresponding bit for a message is set when
393    * it has been acknowledged by the receiver
394    */
395   GNUNET_STREAM_AckBitmap ack_bitmap;
396
397   /**
398    * Number of bytes in this write handle
399    */
400   size_t size;
401
402   /**
403    * Number of packets sent before waiting for an ack
404    *
405    * FIXME: Do we need this?
406    */
407   unsigned int sent_packets;
408 };
409
410
411 /**
412  * The IO Read Handle
413  */
414 struct GNUNET_STREAM_IOReadHandle
415 {
416   /**
417    * Callback for the read processor
418    */
419   GNUNET_STREAM_DataProcessor proc;
420
421   /**
422    * The closure pointer for the read processor callback
423    */
424   void *proc_cls;
425 };
426
427
428 /**
429  * Default value in seconds for various timeouts
430  */
431 static unsigned int default_timeout = 10;
432
433 /**
434  * Callback function for sending hello message
435  *
436  * @param cls closure the socket
437  * @param size number of bytes available in buf
438  * @param buf where the callee should write the message
439  * @return number of bytes written to buf
440  */
441 static size_t
442 send_message_notify (void *cls, size_t size, void *buf)
443 {
444   struct GNUNET_STREAM_Socket *socket = cls;
445   struct GNUNET_PeerIdentity target;
446   struct MessageQueue *head;
447   size_t ret;
448
449   socket->transmit_handle = NULL; /* Remove the transmit handle */
450   head = socket->queue_head;
451   if (NULL == head)
452     return 0; /* just to be safe */
453   GNUNET_PEER_resolve (socket->other_peer, &target);
454   if (0 == size)                /* request timed out */
455     {
456       socket->retries++;
457       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458                   "Message sending timed out. Retry %d \n",
459                   socket->retries);
460       socket->transmit_handle = 
461         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
462                                            0, /* Corking */
463                                            1, /* Priority */
464                                            /* FIXME: exponential backoff */
465                                            socket->retransmit_timeout,
466                                            &target,
467                                            ntohs (head->message->header.size),
468                                            &send_message_notify,
469                                            socket);
470       return 0;
471     }
472
473   ret = ntohs (head->message->header.size);
474   GNUNET_assert (size >= ret);
475   memcpy (buf, head->message, ret);
476   if (NULL != head->finish_cb)
477     {
478       head->finish_cb (head->finish_cb_cls, socket);
479     }
480   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
481                                socket->queue_tail,
482                                head);
483   GNUNET_free (head->message);
484   GNUNET_free (head);
485   head = socket->queue_head;
486   if (NULL != head)    /* more pending messages to send */
487     {
488       socket->retries = 0;
489       socket->transmit_handle = 
490         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
491                                            0, /* Corking */
492                                            1, /* Priority */
493                                            /* FIXME: exponential backoff */
494                                            socket->retransmit_timeout,
495                                            &target,
496                                            ntohs (head->message->header.size),
497                                            &send_message_notify,
498                                            socket);
499     }
500   return ret;
501 }
502
503
504 /**
505  * Queues a message for sending using the mesh connection of a socket
506  *
507  * @param socket the socket whose mesh connection is used
508  * @param message the message to be sent
509  * @param finish_cb the callback to be called when the message is sent
510  * @param finish_cb_cls the closure for the callback
511  */
512 static void
513 queue_message (struct GNUNET_STREAM_Socket *socket,
514                struct GNUNET_STREAM_MessageHeader *message,
515                SendFinishCallback finish_cb,
516                void *finish_cb_cls)
517 {
518   struct MessageQueue *queue_entity;
519   struct GNUNET_PeerIdentity target;
520
521   GNUNET_assert 
522     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
523      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
524
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526               "%x: Queueing message of type %d and size %d\n",
527               socket->our_id,
528               ntohs (message->header.type),
529               ntohs (message->header.size));
530   GNUNET_assert (NULL != message);
531   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
532   queue_entity->message = message;
533   queue_entity->finish_cb = finish_cb;
534   queue_entity->finish_cb_cls = finish_cb_cls;
535   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
536                                     socket->queue_tail,
537                                     queue_entity);
538   if (NULL == socket->transmit_handle)
539   {
540     socket->retries = 0;
541     GNUNET_PEER_resolve (socket->other_peer, &target);
542     socket->transmit_handle = 
543       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
544                                          0, /* Corking */
545                                          1, /* Priority */
546                                          socket->retransmit_timeout,
547                                          &target,
548                                          ntohs (message->header.size),
549                                          &send_message_notify,
550                                          socket);
551   }
552 }
553
554
555 /**
556  * Copies a message and queues it for sending using the mesh connection of
557  * given socket 
558  *
559  * @param socket the socket whose mesh connection is used
560  * @param message the message to be sent
561  * @param finish_cb the callback to be called when the message is sent
562  * @param finish_cb_cls the closure for the callback
563  */
564 static void
565 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
566                         const struct GNUNET_STREAM_MessageHeader *message,
567                         SendFinishCallback finish_cb,
568                         void *finish_cb_cls)
569 {
570   struct GNUNET_STREAM_MessageHeader *msg_copy;
571   uint16_t size;
572   
573   size = ntohs (message->header.size);
574   msg_copy = GNUNET_malloc (size);
575   memcpy (msg_copy, message, size);
576   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
577 }
578
579
580 /**
581  * Callback function for sending ack message
582  *
583  * @param cls closure the ACK message created in ack_task
584  * @param size number of bytes available in buffer
585  * @param buf where the callee should write the message
586  * @return number of bytes written to buf
587  */
588 static size_t
589 send_ack_notify (void *cls, size_t size, void *buf)
590 {
591   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
592
593   if (0 == size)
594     {
595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596                   "%s called with size 0\n", __func__);
597       return 0;
598     }
599   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
600   
601   size = ntohs (ack_msg->header.header.size);
602   memcpy (buf, ack_msg, size);
603   return size;
604 }
605
606 /**
607  * Writes data using the given socket. The amount of data written is limited by
608  * the receiver_window_size
609  *
610  * @param socket the socket to use
611  */
612 static void 
613 write_data (struct GNUNET_STREAM_Socket *socket);
614
615 /**
616  * Task for retransmitting data messages if they aren't ACK before their ack
617  * deadline 
618  *
619  * @param cls the socket
620  * @param tc the Task context
621  */
622 static void
623 retransmission_timeout_task (void *cls,
624                              const struct GNUNET_SCHEDULER_TaskContext *tc)
625 {
626   struct GNUNET_STREAM_Socket *socket = cls;
627   
628   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
629     return;
630
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "%x: Retransmitting DATA...\n", socket->our_id);
633   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
634   write_data (socket);
635 }
636
637
638 /**
639  * Task for sending ACK message
640  *
641  * @param cls the socket
642  * @param tc the Task context
643  */
644 static void
645 ack_task (void *cls,
646           const struct GNUNET_SCHEDULER_TaskContext *tc)
647 {
648   struct GNUNET_STREAM_Socket *socket = cls;
649   struct GNUNET_STREAM_AckMessage *ack_msg;
650   struct GNUNET_PeerIdentity target;
651
652   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
653     {
654       return;
655     }
656
657   socket->ack_task_id = 0;
658
659   /* Create the ACK Message */
660   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
661   ack_msg->header.header.size = htons (sizeof (struct 
662                                                GNUNET_STREAM_AckMessage));
663   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
664   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
665   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
666   ack_msg->receive_window_remaining = 
667     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
668
669   GNUNET_PEER_resolve (socket->other_peer, &target);
670   /* Request MESH for sending ACK */
671   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
672                                      0, /* Corking */
673                                      1, /* Priority */
674                                      socket->retransmit_timeout,
675                                      &target,
676                                      ntohs (ack_msg->header.header.size),
677                                      &send_ack_notify,
678                                      ack_msg);
679
680   
681 }
682
683
684 /**
685  * Function to modify a bit in GNUNET_STREAM_AckBitmap
686  *
687  * @param bitmap the bitmap to modify
688  * @param bit the bit number to modify
689  * @param value GNUNET_YES to on, GNUNET_NO to off
690  */
691 static void
692 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
693                       unsigned int bit, 
694                       int value)
695 {
696   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
697   if (GNUNET_YES == value)
698     *bitmap |= (1LL << bit);
699   else
700     *bitmap &= ~(1LL << bit);
701 }
702
703
704 /**
705  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
706  *
707  * @param bitmap address of the bitmap that has to be checked
708  * @param bit the bit number to check
709  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
710  */
711 static uint8_t
712 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
713                       unsigned int bit)
714 {
715   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
716   return 0 != (*bitmap & (1LL << bit));
717 }
718
719
720
721 /**
722  * Function called when Data Message is sent
723  *
724  * @param cls the io_handle corresponding to the Data Message
725  * @param socket the socket which was used
726  */
727 static void
728 write_data_finish_cb (void *cls,
729                       struct GNUNET_STREAM_Socket *socket)
730 {
731   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
732
733   io_handle->sent_packets++;
734 }
735
736
737 /**
738  * Writes data using the given socket. The amount of data written is limited by
739  * the receiver_window_size
740  *
741  * @param socket the socket to use
742  */
743 static void 
744 write_data (struct GNUNET_STREAM_Socket *socket)
745 {
746   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
747   int packet;                   /* Although an int, should never be negative */
748   int ack_packet;
749
750   ack_packet = -1;
751   /* Find the last acknowledged packet */
752   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
753     {      
754       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
755                                               packet))
756         ack_packet = packet;        
757       else if (NULL == io_handle->messages[packet])
758         break;
759     }
760   /* Resend packets which weren't ack'ed */
761   for (packet=0; packet < ack_packet; packet++)
762     {
763       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
764                                              packet))
765         {
766           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767                       "%x: Placing DATA message with sequence %u in send queue\n",
768                       socket->our_id,
769                       ntohl (io_handle->messages[packet]->sequence_number));
770
771           copy_and_queue_message (socket,
772                                   &io_handle->messages[packet]->header,
773                                   NULL,
774                                   NULL);
775         }
776     }
777   packet = ack_packet + 1;
778   /* Now send new packets if there is enough buffer space */
779   while ( (NULL != io_handle->messages[packet]) &&
780           (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
781     {
782       socket->receiver_window_available -= 
783         ntohs (io_handle->messages[packet]->header.header.size);
784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785                   "%x: Placing DATA message with sequence %u in send queue\n",
786                   socket->our_id,
787                   ntohl (io_handle->messages[packet]->sequence_number));
788       copy_and_queue_message (socket,
789                               &io_handle->messages[packet]->header,
790                               &write_data_finish_cb,
791                               io_handle);
792       packet++;
793     }
794
795   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
796     socket->retransmission_timeout_task_id = 
797       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
798                                     (GNUNET_TIME_UNIT_SECONDS, 5),
799                                     &retransmission_timeout_task,
800                                     socket);
801 }
802
803
804 /**
805  * Task for calling the read processor
806  *
807  * @param cls the socket
808  * @param tc the task context
809  */
810 static void
811 call_read_processor (void *cls,
812                           const struct GNUNET_SCHEDULER_TaskContext *tc)
813 {
814   struct GNUNET_STREAM_Socket *socket = cls;
815   size_t read_size;
816   size_t valid_read_size;
817   unsigned int packet;
818   uint32_t sequence_increase;
819   uint32_t offset_increase;
820
821   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
822   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
823     return;
824
825   GNUNET_assert (NULL != socket->read_handle);
826   GNUNET_assert (NULL != socket->read_handle->proc);
827
828   /* Check the bitmap for any holes */
829   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
830     {
831       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
832                                              packet))
833         break;
834     }
835   /* We only call read processor if we have the first packet */
836   GNUNET_assert (0 < packet);
837
838   valid_read_size = 
839     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
840
841   GNUNET_assert (0 != valid_read_size);
842
843   /* Cancel the read_io_timeout_task */
844   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
845   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
846
847   /* Call the data processor */
848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849               "%x: Calling read processor\n",
850               socket->our_id);
851   read_size = 
852     socket->read_handle->proc (socket->read_handle->proc_cls,
853                                socket->status,
854                                socket->receive_buffer + socket->copy_offset,
855                                valid_read_size);
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "%x: Read processor read %d bytes\n",
858               socket->our_id,
859               read_size);
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "%x: Read processor completed successfully\n",
862               socket->our_id);
863
864   /* Free the read handle */
865   GNUNET_free (socket->read_handle);
866   socket->read_handle = NULL;
867
868   GNUNET_assert (read_size <= valid_read_size);
869   socket->copy_offset += read_size;
870
871   /* Determine upto which packet we can remove from the buffer */
872   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
873     {
874       if (socket->copy_offset = socket->receive_buffer_boundaries[packet])
875         { packet++; break; }
876       if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
877         break;
878     }
879
880   /* If no packets can be removed we can't move the buffer */
881   if (0 == packet) return;
882
883   sequence_increase = packet;
884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885               "%x: Sequence increase after read processor completion: %u\n",
886               socket->our_id,
887               sequence_increase);
888
889   /* Shift the data in the receive buffer */
890   memmove (socket->receive_buffer,
891            socket->receive_buffer 
892            + socket->receive_buffer_boundaries[sequence_increase-1],
893            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
894   
895   /* Shift the bitmap */
896   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
897   
898   /* Set read_sequence_number */
899   socket->read_sequence_number += sequence_increase;
900   
901   /* Set read_offset */
902   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
903   socket->read_offset += offset_increase;
904
905   /* Fix copy_offset */
906   GNUNET_assert (offset_increase <= socket->copy_offset);
907   socket->copy_offset -= offset_increase;
908   
909   /* Fix relative boundaries */
910   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
911     {
912       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
913         {
914           socket->receive_buffer_boundaries[packet] = 
915             socket->receive_buffer_boundaries[packet + sequence_increase] 
916             - offset_increase;
917         }
918       else
919         socket->receive_buffer_boundaries[packet] = 0;
920     }
921 }
922
923
924 /**
925  * Cancels the existing read io handle
926  *
927  * @param cls the closure from the SCHEDULER call
928  * @param tc the task context
929  */
930 static void
931 read_io_timeout (void *cls, 
932                 const struct GNUNET_SCHEDULER_TaskContext *tc)
933 {
934   struct GNUNET_STREAM_Socket *socket = cls;
935   GNUNET_STREAM_DataProcessor proc;
936   void *proc_cls;
937
938   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
939   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
940   {
941     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942                 "%x: Read task timedout - Cancelling it\n",
943                 socket->our_id);
944     GNUNET_SCHEDULER_cancel (socket->read_task_id);
945     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
946   }
947   GNUNET_assert (NULL != socket->read_handle);
948   proc = socket->read_handle->proc;
949   proc_cls = socket->read_handle->proc_cls;
950
951   GNUNET_free (socket->read_handle);
952   socket->read_handle = NULL;
953   /* Call the read processor to signal timeout */
954   proc (proc_cls,
955         GNUNET_STREAM_TIMEOUT,
956         NULL,
957         0);
958 }
959
960
961 /**
962  * Handler for DATA messages; Same for both client and server
963  *
964  * @param socket the socket through which the ack was received
965  * @param tunnel connection to the other end
966  * @param sender who sent the message
967  * @param msg the data message
968  * @param atsi performance data for the connection
969  * @return GNUNET_OK to keep the connection open,
970  *         GNUNET_SYSERR to close it (signal serious error)
971  */
972 static int
973 handle_data (struct GNUNET_STREAM_Socket *socket,
974              struct GNUNET_MESH_Tunnel *tunnel,
975              const struct GNUNET_PeerIdentity *sender,
976              const struct GNUNET_STREAM_DataMessage *msg,
977              const struct GNUNET_ATS_Information*atsi)
978 {
979   const void *payload;
980   uint32_t bytes_needed;
981   uint32_t relative_offset;
982   uint32_t relative_sequence_number;
983   uint16_t size;
984
985   size = htons (msg->header.header.size);
986   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
987     {
988       GNUNET_break_op (0);
989       return GNUNET_SYSERR;
990     }
991
992   if (GNUNET_PEER_search (sender) != socket->other_peer)
993     {
994       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995                   "%x: Received DATA from non-confirming peer\n",
996                   socket->our_id);
997       return GNUNET_YES;
998     }
999
1000   switch (socket->state)
1001     {
1002     case STATE_ESTABLISHED:
1003     case STATE_TRANSMIT_CLOSED:
1004     case STATE_TRANSMIT_CLOSE_WAIT:
1005       
1006       /* check if the message's sequence number is in the range we are
1007          expecting */
1008       relative_sequence_number = 
1009         ntohl (msg->sequence_number) - socket->read_sequence_number;
1010       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1011         {
1012           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1013                       "%x: Ignoring received message with sequence number %u\n",
1014                       socket->our_id,
1015                       ntohl (msg->sequence_number));
1016           /* Start ACK sending task if one is not already present */
1017           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1018             {
1019               socket->ack_task_id = 
1020                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1021                                               (msg->ack_deadline),
1022                                               &ack_task,
1023                                               socket);
1024             }
1025           return GNUNET_YES;
1026         }
1027       
1028       /* Check if we have already seen this message */
1029       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1030                                               relative_sequence_number))
1031         {
1032           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033                       "%x: Ignoring already received message with sequence "
1034                       "number %u\n",
1035                       socket->our_id,
1036                       ntohl (msg->sequence_number));
1037           /* Start ACK sending task if one is not already present */
1038           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1039             {
1040               socket->ack_task_id = 
1041                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1042                                               (msg->ack_deadline),
1043                                               &ack_task,
1044                                               socket);
1045             }
1046           return GNUNET_YES;
1047         }
1048
1049       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1050                   "%x: Receiving DATA with sequence number: %u and size: %d "
1051                   "from %x\n",
1052                   socket->our_id,
1053                   ntohl (msg->sequence_number),
1054                   ntohs (msg->header.header.size),
1055                   socket->other_peer);
1056
1057       /* Check if we have to allocate the buffer */
1058       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1059       relative_offset = ntohl (msg->offset) - socket->read_offset;
1060       bytes_needed = relative_offset + size;
1061       if (bytes_needed > socket->receive_buffer_size)
1062         {
1063           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1064             {
1065               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1066                                                        bytes_needed);
1067               socket->receive_buffer_size = bytes_needed;
1068             }
1069           else
1070             {
1071               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072                           "%x: Cannot accommodate packet %d as buffer is",
1073                           "full\n",
1074                           socket->our_id,
1075                           ntohl (msg->sequence_number));
1076               return GNUNET_YES;
1077             }
1078         }
1079       
1080       /* Copy Data to buffer */
1081       payload = &msg[1];
1082       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1083       memcpy (socket->receive_buffer + relative_offset,
1084               payload,
1085               size);
1086       socket->receive_buffer_boundaries[relative_sequence_number] = 
1087         relative_offset + size;
1088       
1089       /* Modify the ACK bitmap */
1090       ackbitmap_modify_bit (&socket->ack_bitmap,
1091                             relative_sequence_number,
1092                             GNUNET_YES);
1093
1094       /* Start ACK sending task if one is not already present */
1095       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1096        {
1097          socket->ack_task_id = 
1098            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1099                                          (msg->ack_deadline),
1100                                          &ack_task,
1101                                          socket);
1102        }
1103
1104       if ((NULL != socket->read_handle) /* A read handle is waiting */
1105           /* There is no current read task */
1106           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1107           /* We have the first packet */
1108           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1109                                                  0)))
1110         {
1111           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112                       "%x: Scheduling read processor\n",
1113                       socket->our_id);
1114
1115           socket->read_task_id = 
1116             GNUNET_SCHEDULER_add_now (&call_read_processor,
1117                                       socket);
1118         }
1119       
1120       break;
1121
1122     default:
1123       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                   "%x: Received data message when it cannot be handled\n",
1125                   socket->our_id);
1126       break;
1127     }
1128   return GNUNET_YES;
1129 }
1130
1131 /**
1132  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1133  *
1134  * @param cls the socket (set from GNUNET_MESH_connect)
1135  * @param tunnel connection to the other end
1136  * @param tunnel_ctx place to store local state associated with the tunnel
1137  * @param sender who sent the message
1138  * @param message the actual message
1139  * @param atsi performance data for the connection
1140  * @return GNUNET_OK to keep the connection open,
1141  *         GNUNET_SYSERR to close it (signal serious error)
1142  */
1143 static int
1144 client_handle_data (void *cls,
1145              struct GNUNET_MESH_Tunnel *tunnel,
1146              void **tunnel_ctx,
1147              const struct GNUNET_PeerIdentity *sender,
1148              const struct GNUNET_MessageHeader *message,
1149              const struct GNUNET_ATS_Information*atsi)
1150 {
1151   struct GNUNET_STREAM_Socket *socket = cls;
1152
1153   return handle_data (socket, 
1154                       tunnel, 
1155                       sender, 
1156                       (const struct GNUNET_STREAM_DataMessage *) message, 
1157                       atsi);
1158 }
1159
1160
1161 /**
1162  * Callback to set state to ESTABLISHED
1163  *
1164  * @param cls the closure from queue_message FIXME: document
1165  * @param socket the socket to requiring state change
1166  */
1167 static void
1168 set_state_established (void *cls,
1169                        struct GNUNET_STREAM_Socket *socket)
1170 {
1171   struct GNUNET_PeerIdentity initiator_pid;
1172
1173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1174               "%x: Attaining ESTABLISHED state\n",
1175               socket->our_id);
1176   socket->write_offset = 0;
1177   socket->read_offset = 0;
1178   socket->state = STATE_ESTABLISHED;
1179   /* FIXME: What if listen_cb is NULL */
1180   if (NULL != socket->lsocket)
1181     {
1182       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1183       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184                   "%x: Calling listen callback\n",
1185                   socket->our_id);
1186       if (GNUNET_SYSERR == 
1187           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1188                                       socket,
1189                                       &initiator_pid))
1190         {
1191           socket->state = STATE_CLOSED;
1192           /* FIXME: We should close in a decent way */
1193           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1194           GNUNET_free (socket);
1195         }
1196     }
1197   else if (socket->open_cb)
1198     socket->open_cb (socket->open_cls, socket);
1199 }
1200
1201
1202 /**
1203  * Callback to set state to HELLO_WAIT
1204  *
1205  * @param cls the closure from queue_message
1206  * @param socket the socket to requiring state change
1207  */
1208 static void
1209 set_state_hello_wait (void *cls,
1210                       struct GNUNET_STREAM_Socket *socket)
1211 {
1212   GNUNET_assert (STATE_INIT == socket->state);
1213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1214               "%x: Attaining HELLO_WAIT state\n",
1215               socket->our_id);
1216   socket->state = STATE_HELLO_WAIT;
1217 }
1218
1219
1220 /**
1221  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1222  *
1223  * @param cls the socket (set from GNUNET_MESH_connect)
1224  * @param tunnel connection to the other end
1225  * @param tunnel_ctx this is NULL
1226  * @param sender who sent the message
1227  * @param message the actual message
1228  * @param atsi performance data for the connection
1229  * @return GNUNET_OK to keep the connection open,
1230  *         GNUNET_SYSERR to close it (signal serious error)
1231  */
1232 static int
1233 client_handle_hello_ack (void *cls,
1234                          struct GNUNET_MESH_Tunnel *tunnel,
1235                          void **tunnel_ctx,
1236                          const struct GNUNET_PeerIdentity *sender,
1237                          const struct GNUNET_MessageHeader *message,
1238                          const struct GNUNET_ATS_Information*atsi)
1239 {
1240   struct GNUNET_STREAM_Socket *socket = cls;
1241   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1242   struct GNUNET_STREAM_HelloAckMessage *reply;
1243
1244   if (GNUNET_PEER_search (sender) != socket->other_peer)
1245     {
1246       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247                   "%x: Received HELLO_ACK from non-confirming peer\n",
1248                   socket->our_id);
1249       return GNUNET_YES;
1250     }
1251   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1253               "%x: Received HELLO_ACK from %x\n",
1254               socket->our_id,
1255               socket->other_peer);
1256
1257   GNUNET_assert (socket->tunnel == tunnel);
1258   switch (socket->state)
1259   {
1260   case STATE_HELLO_WAIT:
1261     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1262     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1263                 "%x: Read sequence number %u\n",
1264                 socket->our_id,
1265                 (unsigned int) socket->read_sequence_number);
1266     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1267     /* Get the random sequence number */
1268     socket->write_sequence_number = 
1269       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1270       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1271                   "%x: Generated write sequence number %u\n",
1272                   socket->our_id,
1273                   (unsigned int) socket->write_sequence_number);
1274     reply = 
1275       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1276     reply->header.header.size = 
1277       htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1278     reply->header.header.type = 
1279       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1280     reply->sequence_number = htonl (socket->write_sequence_number);
1281     reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1282     queue_message (socket, 
1283                    &reply->header, 
1284                    &set_state_established, 
1285                    NULL);      
1286     return GNUNET_OK;
1287   case STATE_ESTABLISHED:
1288   case STATE_RECEIVE_CLOSE_WAIT:
1289     // call statistics (# ACKs ignored++)
1290     return GNUNET_OK;
1291   case STATE_INIT:
1292   default:
1293     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1295                 socket->our_id,
1296                 socket->other_peer,
1297                 socket->state);
1298     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1299     return GNUNET_SYSERR;
1300   }
1301
1302 }
1303
1304
1305 /**
1306  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1307  *
1308  * @param cls the socket (set from GNUNET_MESH_connect)
1309  * @param tunnel connection to the other end
1310  * @param tunnel_ctx this is NULL
1311  * @param sender who sent the message
1312  * @param message the actual message
1313  * @param atsi performance data for the connection
1314  * @return GNUNET_OK to keep the connection open,
1315  *         GNUNET_SYSERR to close it (signal serious error)
1316  */
1317 static int
1318 client_handle_reset (void *cls,
1319                      struct GNUNET_MESH_Tunnel *tunnel,
1320                      void **tunnel_ctx,
1321                      const struct GNUNET_PeerIdentity *sender,
1322                      const struct GNUNET_MessageHeader *message,
1323                      const struct GNUNET_ATS_Information*atsi)
1324 {
1325   struct GNUNET_STREAM_Socket *socket = cls;
1326
1327   return GNUNET_OK;
1328 }
1329
1330
1331 /**
1332  * Common message handler for handling TRANSMIT_CLOSE messages
1333  *
1334  * @param socket the socket through which the ack was received
1335  * @param tunnel connection to the other end
1336  * @param sender who sent the message
1337  * @param msg the transmit close message
1338  * @param atsi performance data for the connection
1339  * @return GNUNET_OK to keep the connection open,
1340  *         GNUNET_SYSERR to close it (signal serious error)
1341  */
1342 static int
1343 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1344                        struct GNUNET_MESH_Tunnel *tunnel,
1345                        const struct GNUNET_PeerIdentity *sender,
1346                        const struct GNUNET_STREAM_MessageHeader *msg,
1347                        const struct GNUNET_ATS_Information*atsi)
1348 {
1349   struct GNUNET_STREAM_MessageHeader *reply;
1350
1351   switch (socket->state)
1352     {
1353     case STATE_ESTABLISHED:
1354       socket->state = STATE_RECEIVE_CLOSED;
1355
1356       /* Send TRANSMIT_CLOSE_ACK */
1357       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1358       reply->header.type = 
1359         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1360       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1361       queue_message (socket, reply, NULL, NULL);
1362       break;
1363
1364     default:
1365       /* FIXME: Call statistics? */
1366       break;
1367     }
1368   return GNUNET_YES;
1369 }
1370
1371
1372 /**
1373  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1374  *
1375  * @param cls the socket (set from GNUNET_MESH_connect)
1376  * @param tunnel connection to the other end
1377  * @param tunnel_ctx this is NULL
1378  * @param sender who sent the message
1379  * @param message the actual message
1380  * @param atsi performance data for the connection
1381  * @return GNUNET_OK to keep the connection open,
1382  *         GNUNET_SYSERR to close it (signal serious error)
1383  */
1384 static int
1385 client_handle_transmit_close (void *cls,
1386                               struct GNUNET_MESH_Tunnel *tunnel,
1387                               void **tunnel_ctx,
1388                               const struct GNUNET_PeerIdentity *sender,
1389                               const struct GNUNET_MessageHeader *message,
1390                               const struct GNUNET_ATS_Information*atsi)
1391 {
1392   struct GNUNET_STREAM_Socket *socket = cls;
1393   
1394   return handle_transmit_close (socket,
1395                                 tunnel,
1396                                 sender,
1397                                 (struct GNUNET_STREAM_MessageHeader *)message,
1398                                 atsi);
1399 }
1400
1401
1402 /**
1403  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1404  *
1405  * @param cls the socket (set from GNUNET_MESH_connect)
1406  * @param tunnel connection to the other end
1407  * @param tunnel_ctx this is NULL
1408  * @param sender who sent the message
1409  * @param message the actual message
1410  * @param atsi performance data for the connection
1411  * @return GNUNET_OK to keep the connection open,
1412  *         GNUNET_SYSERR to close it (signal serious error)
1413  */
1414 static int
1415 client_handle_transmit_close_ack (void *cls,
1416                                   struct GNUNET_MESH_Tunnel *tunnel,
1417                                   void **tunnel_ctx,
1418                                   const struct GNUNET_PeerIdentity *sender,
1419                                   const struct GNUNET_MessageHeader *message,
1420                                   const struct GNUNET_ATS_Information*atsi)
1421 {
1422   struct GNUNET_STREAM_Socket *socket = cls;
1423
1424   return GNUNET_OK;
1425 }
1426
1427
1428 /**
1429  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1430  *
1431  * @param cls the socket (set from GNUNET_MESH_connect)
1432  * @param tunnel connection to the other end
1433  * @param tunnel_ctx this is NULL
1434  * @param sender who sent the message
1435  * @param message the actual message
1436  * @param atsi performance data for the connection
1437  * @return GNUNET_OK to keep the connection open,
1438  *         GNUNET_SYSERR to close it (signal serious error)
1439  */
1440 static int
1441 client_handle_receive_close (void *cls,
1442                              struct GNUNET_MESH_Tunnel *tunnel,
1443                              void **tunnel_ctx,
1444                              const struct GNUNET_PeerIdentity *sender,
1445                              const struct GNUNET_MessageHeader *message,
1446                              const struct GNUNET_ATS_Information*atsi)
1447 {
1448   struct GNUNET_STREAM_Socket *socket = cls;
1449
1450   return GNUNET_OK;
1451 }
1452
1453
1454 /**
1455  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1456  *
1457  * @param cls the socket (set from GNUNET_MESH_connect)
1458  * @param tunnel connection to the other end
1459  * @param tunnel_ctx this is NULL
1460  * @param sender who sent the message
1461  * @param message the actual message
1462  * @param atsi performance data for the connection
1463  * @return GNUNET_OK to keep the connection open,
1464  *         GNUNET_SYSERR to close it (signal serious error)
1465  */
1466 static int
1467 client_handle_receive_close_ack (void *cls,
1468                                  struct GNUNET_MESH_Tunnel *tunnel,
1469                                  void **tunnel_ctx,
1470                                  const struct GNUNET_PeerIdentity *sender,
1471                                  const struct GNUNET_MessageHeader *message,
1472                                  const struct GNUNET_ATS_Information*atsi)
1473 {
1474   struct GNUNET_STREAM_Socket *socket = cls;
1475
1476   return GNUNET_OK;
1477 }
1478
1479
1480 /**
1481  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1482  *
1483  * @param cls the socket (set from GNUNET_MESH_connect)
1484  * @param tunnel connection to the other end
1485  * @param tunnel_ctx this is NULL
1486  * @param sender who sent the message
1487  * @param message the actual message
1488  * @param atsi performance data for the connection
1489  * @return GNUNET_OK to keep the connection open,
1490  *         GNUNET_SYSERR to close it (signal serious error)
1491  */
1492 static int
1493 client_handle_close (void *cls,
1494                      struct GNUNET_MESH_Tunnel *tunnel,
1495                      void **tunnel_ctx,
1496                      const struct GNUNET_PeerIdentity *sender,
1497                      const struct GNUNET_MessageHeader *message,
1498                      const struct GNUNET_ATS_Information*atsi)
1499 {
1500   struct GNUNET_STREAM_Socket *socket = cls;
1501
1502   return GNUNET_OK;
1503 }
1504
1505
1506 /**
1507  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1508  *
1509  * @param cls the socket (set from GNUNET_MESH_connect)
1510  * @param tunnel connection to the other end
1511  * @param tunnel_ctx this is NULL
1512  * @param sender who sent the message
1513  * @param message the actual message
1514  * @param atsi performance data for the connection
1515  * @return GNUNET_OK to keep the connection open,
1516  *         GNUNET_SYSERR to close it (signal serious error)
1517  */
1518 static int
1519 client_handle_close_ack (void *cls,
1520                          struct GNUNET_MESH_Tunnel *tunnel,
1521                          void **tunnel_ctx,
1522                          const struct GNUNET_PeerIdentity *sender,
1523                          const struct GNUNET_MessageHeader *message,
1524                          const struct GNUNET_ATS_Information*atsi)
1525 {
1526   struct GNUNET_STREAM_Socket *socket = cls;
1527
1528   return GNUNET_OK;
1529 }
1530
1531 /*****************************/
1532 /* Server's Message Handlers */
1533 /*****************************/
1534
1535 /**
1536  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1537  *
1538  * @param cls the closure
1539  * @param tunnel connection to the other end
1540  * @param tunnel_ctx the socket
1541  * @param sender who sent the message
1542  * @param message the actual message
1543  * @param atsi performance data for the connection
1544  * @return GNUNET_OK to keep the connection open,
1545  *         GNUNET_SYSERR to close it (signal serious error)
1546  */
1547 static int
1548 server_handle_data (void *cls,
1549                     struct GNUNET_MESH_Tunnel *tunnel,
1550                     void **tunnel_ctx,
1551                     const struct GNUNET_PeerIdentity *sender,
1552                     const struct GNUNET_MessageHeader *message,
1553                     const struct GNUNET_ATS_Information*atsi)
1554 {
1555   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1556
1557   return handle_data (socket,
1558                       tunnel,
1559                       sender,
1560                       (const struct GNUNET_STREAM_DataMessage *)message,
1561                       atsi);
1562 }
1563
1564
1565 /**
1566  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1567  *
1568  * @param cls the closure
1569  * @param tunnel connection to the other end
1570  * @param tunnel_ctx the socket
1571  * @param sender who sent the message
1572  * @param message the actual message
1573  * @param atsi performance data for the connection
1574  * @return GNUNET_OK to keep the connection open,
1575  *         GNUNET_SYSERR to close it (signal serious error)
1576  */
1577 static int
1578 server_handle_hello (void *cls,
1579                      struct GNUNET_MESH_Tunnel *tunnel,
1580                      void **tunnel_ctx,
1581                      const struct GNUNET_PeerIdentity *sender,
1582                      const struct GNUNET_MessageHeader *message,
1583                      const struct GNUNET_ATS_Information*atsi)
1584 {
1585   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1586   struct GNUNET_STREAM_HelloAckMessage *reply;
1587
1588   if (GNUNET_PEER_search (sender) != socket->other_peer)
1589     {
1590       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1591                   "%x: Received HELLO from non-confirming peer\n",
1592                   socket->our_id);
1593       return GNUNET_YES;
1594     }
1595
1596   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1597                  ntohs (message->type));
1598   GNUNET_assert (socket->tunnel == tunnel);
1599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1600               "%x: Received HELLO from %x\n", 
1601               socket->our_id,
1602               socket->other_peer);
1603
1604   if (STATE_INIT == socket->state)
1605     {
1606       /* Get the random sequence number */
1607       socket->write_sequence_number = 
1608         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1610                   "%x: Generated write sequence number %u\n",
1611                   socket->our_id,
1612                   (unsigned int) socket->write_sequence_number);
1613       reply = 
1614         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1615       reply->header.header.size = 
1616         htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1617       reply->header.header.type = 
1618         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1619       reply->sequence_number = htonl (socket->write_sequence_number);
1620       reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1621       queue_message (socket, 
1622                      &reply->header,
1623                      &set_state_hello_wait, 
1624                      NULL);
1625     }
1626   else
1627     {
1628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1629                   "Client sent HELLO when in state %d\n", socket->state);
1630       /* FIXME: Send RESET? */
1631       
1632     }
1633   return GNUNET_OK;
1634 }
1635
1636
1637 /**
1638  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1639  *
1640  * @param cls the closure
1641  * @param tunnel connection to the other end
1642  * @param tunnel_ctx the socket
1643  * @param sender who sent the message
1644  * @param message the actual message
1645  * @param atsi performance data for the connection
1646  * @return GNUNET_OK to keep the connection open,
1647  *         GNUNET_SYSERR to close it (signal serious error)
1648  */
1649 static int
1650 server_handle_hello_ack (void *cls,
1651                          struct GNUNET_MESH_Tunnel *tunnel,
1652                          void **tunnel_ctx,
1653                          const struct GNUNET_PeerIdentity *sender,
1654                          const struct GNUNET_MessageHeader *message,
1655                          const struct GNUNET_ATS_Information*atsi)
1656 {
1657   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1658   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1659
1660   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1661                  ntohs (message->type));
1662   GNUNET_assert (socket->tunnel == tunnel);
1663   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1664   if (STATE_HELLO_WAIT == socket->state)
1665     {
1666       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1667                   "%x: Received HELLO_ACK from %x\n",
1668                   socket->our_id,
1669                   socket->other_peer);
1670       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1671       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672                   "%x: Read sequence number %u\n",
1673                   socket->our_id,
1674                   (unsigned int) socket->read_sequence_number);
1675       socket->receiver_window_available = 
1676         ntohl (ack_message->receiver_window_size);
1677       /* Attain ESTABLISHED state */
1678       set_state_established (NULL, socket);
1679     }
1680   else
1681     {
1682       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1684       /* FIXME: Send RESET? */
1685       
1686     }
1687   return GNUNET_OK;
1688 }
1689
1690
1691 /**
1692  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1693  *
1694  * @param cls the closure
1695  * @param tunnel connection to the other end
1696  * @param tunnel_ctx the socket
1697  * @param sender who sent the message
1698  * @param message the actual message
1699  * @param atsi performance data for the connection
1700  * @return GNUNET_OK to keep the connection open,
1701  *         GNUNET_SYSERR to close it (signal serious error)
1702  */
1703 static int
1704 server_handle_reset (void *cls,
1705                      struct GNUNET_MESH_Tunnel *tunnel,
1706                      void **tunnel_ctx,
1707                      const struct GNUNET_PeerIdentity *sender,
1708                      const struct GNUNET_MessageHeader *message,
1709                      const struct GNUNET_ATS_Information*atsi)
1710 {
1711   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1712
1713   return GNUNET_OK;
1714 }
1715
1716
1717 /**
1718  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1719  *
1720  * @param cls the closure
1721  * @param tunnel connection to the other end
1722  * @param tunnel_ctx the socket
1723  * @param sender who sent the message
1724  * @param message the actual message
1725  * @param atsi performance data for the connection
1726  * @return GNUNET_OK to keep the connection open,
1727  *         GNUNET_SYSERR to close it (signal serious error)
1728  */
1729 static int
1730 server_handle_transmit_close (void *cls,
1731                               struct GNUNET_MESH_Tunnel *tunnel,
1732                               void **tunnel_ctx,
1733                               const struct GNUNET_PeerIdentity *sender,
1734                               const struct GNUNET_MessageHeader *message,
1735                               const struct GNUNET_ATS_Information*atsi)
1736 {
1737   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1738
1739   return handle_transmit_close (socket,
1740                                 tunnel,
1741                                 sender,
1742                                 (struct GNUNET_STREAM_MessageHeader *)message,
1743                                 atsi);
1744 }
1745
1746
1747 /**
1748  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1749  *
1750  * @param cls the closure
1751  * @param tunnel connection to the other end
1752  * @param tunnel_ctx the socket
1753  * @param sender who sent the message
1754  * @param message the actual message
1755  * @param atsi performance data for the connection
1756  * @return GNUNET_OK to keep the connection open,
1757  *         GNUNET_SYSERR to close it (signal serious error)
1758  */
1759 static int
1760 server_handle_transmit_close_ack (void *cls,
1761                                   struct GNUNET_MESH_Tunnel *tunnel,
1762                                   void **tunnel_ctx,
1763                                   const struct GNUNET_PeerIdentity *sender,
1764                                   const struct GNUNET_MessageHeader *message,
1765                                   const struct GNUNET_ATS_Information*atsi)
1766 {
1767   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1768
1769   return GNUNET_OK;
1770 }
1771
1772
1773 /**
1774  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1775  *
1776  * @param cls the closure
1777  * @param tunnel connection to the other end
1778  * @param tunnel_ctx the socket
1779  * @param sender who sent the message
1780  * @param message the actual message
1781  * @param atsi performance data for the connection
1782  * @return GNUNET_OK to keep the connection open,
1783  *         GNUNET_SYSERR to close it (signal serious error)
1784  */
1785 static int
1786 server_handle_receive_close (void *cls,
1787                              struct GNUNET_MESH_Tunnel *tunnel,
1788                              void **tunnel_ctx,
1789                              const struct GNUNET_PeerIdentity *sender,
1790                              const struct GNUNET_MessageHeader *message,
1791                              const struct GNUNET_ATS_Information*atsi)
1792 {
1793   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1794
1795   return GNUNET_OK;
1796 }
1797
1798
1799 /**
1800  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1801  *
1802  * @param cls the closure
1803  * @param tunnel connection to the other end
1804  * @param tunnel_ctx the socket
1805  * @param sender who sent the message
1806  * @param message the actual message
1807  * @param atsi performance data for the connection
1808  * @return GNUNET_OK to keep the connection open,
1809  *         GNUNET_SYSERR to close it (signal serious error)
1810  */
1811 static int
1812 server_handle_receive_close_ack (void *cls,
1813                                  struct GNUNET_MESH_Tunnel *tunnel,
1814                                  void **tunnel_ctx,
1815                                  const struct GNUNET_PeerIdentity *sender,
1816                                  const struct GNUNET_MessageHeader *message,
1817                                  const struct GNUNET_ATS_Information*atsi)
1818 {
1819   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1820
1821   return GNUNET_OK;
1822 }
1823
1824
1825 /**
1826  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1827  *
1828  * @param cls the closure
1829  * @param tunnel connection to the other end
1830  * @param tunnel_ctx the socket
1831  * @param sender who sent the message
1832  * @param message the actual message
1833  * @param atsi performance data for the connection
1834  * @return GNUNET_OK to keep the connection open,
1835  *         GNUNET_SYSERR to close it (signal serious error)
1836  */
1837 static int
1838 server_handle_close (void *cls,
1839                      struct GNUNET_MESH_Tunnel *tunnel,
1840                      void **tunnel_ctx,
1841                      const struct GNUNET_PeerIdentity *sender,
1842                      const struct GNUNET_MessageHeader *message,
1843                      const struct GNUNET_ATS_Information*atsi)
1844 {
1845   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1846
1847   return GNUNET_OK;
1848 }
1849
1850
1851 /**
1852  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1853  *
1854  * @param cls the closure
1855  * @param tunnel connection to the other end
1856  * @param tunnel_ctx the socket
1857  * @param sender who sent the message
1858  * @param message the actual message
1859  * @param atsi performance data for the connection
1860  * @return GNUNET_OK to keep the connection open,
1861  *         GNUNET_SYSERR to close it (signal serious error)
1862  */
1863 static int
1864 server_handle_close_ack (void *cls,
1865                          struct GNUNET_MESH_Tunnel *tunnel,
1866                          void **tunnel_ctx,
1867                          const struct GNUNET_PeerIdentity *sender,
1868                          const struct GNUNET_MessageHeader *message,
1869                          const struct GNUNET_ATS_Information*atsi)
1870 {
1871   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1872
1873   return GNUNET_OK;
1874 }
1875
1876
1877 /**
1878  * Message Handler for mesh
1879  *
1880  * @param socket the socket through which the ack was received
1881  * @param tunnel connection to the other end
1882  * @param sender who sent the message
1883  * @param ack the acknowledgment message
1884  * @param atsi performance data for the connection
1885  * @return GNUNET_OK to keep the connection open,
1886  *         GNUNET_SYSERR to close it (signal serious error)
1887  */
1888 static int
1889 handle_ack (struct GNUNET_STREAM_Socket *socket,
1890             struct GNUNET_MESH_Tunnel *tunnel,
1891             const struct GNUNET_PeerIdentity *sender,
1892             const struct GNUNET_STREAM_AckMessage *ack,
1893             const struct GNUNET_ATS_Information*atsi)
1894 {
1895   unsigned int packet;
1896   int need_retransmission;
1897   
1898
1899   if (GNUNET_PEER_search (sender) != socket->other_peer)
1900     {
1901       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1902                   "%x: Received ACK from non-confirming peer\n",
1903                   socket->our_id);
1904       return GNUNET_YES;
1905     }
1906
1907   switch (socket->state)
1908     {
1909     case (STATE_ESTABLISHED):
1910       if (NULL == socket->write_handle)
1911         {
1912           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1913                       "%x: Received DATA_ACK when write_handle is NULL\n",
1914                       socket->our_id);
1915           return GNUNET_OK;
1916         }
1917       /* FIXME: increment in the base sequence number is breaking current flow
1918        */
1919       if (!((socket->write_sequence_number 
1920              - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1921         {
1922           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1923                       "%x: Received DATA_ACK with unexpected base sequence "
1924                       "number\n",
1925                       socket->our_id);
1926           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1927                       "%x: Current write sequence: %u; Ack's base sequence: %u\n",
1928                       socket->our_id,
1929                       socket->write_sequence_number,
1930                       ntohl (ack->base_sequence_number));
1931           return GNUNET_OK;
1932         }
1933       /* FIXME: include the case when write_handle is cancelled - ignore the 
1934          acks */
1935
1936       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1937                   "%x: Received DATA_ACK from %x\n",
1938                   socket->our_id,
1939                   socket->other_peer);
1940       
1941       /* Cancel the retransmission task */
1942       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1943         {
1944           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1945           socket->retransmission_timeout_task_id = 
1946             GNUNET_SCHEDULER_NO_TASK;
1947         }
1948
1949       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1950         {
1951           if (NULL == socket->write_handle->messages[packet]) break;
1952           if (ntohl (ack->base_sequence_number)
1953               >= ntohl (socket->write_handle->messages[packet]->sequence_number))
1954             ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1955                                   packet,
1956                                   GNUNET_YES);
1957           else
1958             if (GNUNET_YES == 
1959                 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
1960                                       ntohl (socket->write_handle->messages[packet]->sequence_number)
1961                                       - ntohl (ack->base_sequence_number)))
1962               ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
1963                                     packet,
1964                                     GNUNET_YES);
1965         }
1966
1967       /* Update the receive window remaining
1968        FIXME : Should update with the value from a data ack with greater
1969        sequence number */
1970       socket->receiver_window_available = 
1971         ntohl (ack->receive_window_remaining);
1972
1973       /* Check if we have received all acknowledgements */
1974       need_retransmission = GNUNET_NO;
1975       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1976         {
1977           if (NULL == socket->write_handle->messages[packet]) break;
1978           if (GNUNET_YES != ackbitmap_is_bit_set 
1979               (&socket->write_handle->ack_bitmap,packet))
1980             {
1981               need_retransmission = GNUNET_YES;
1982               break;
1983             }
1984         }
1985       if (GNUNET_YES == need_retransmission)
1986         {
1987           write_data (socket);
1988         }
1989       else      /* We have to call the write continuation callback now */
1990         {
1991           /* Free the packets */
1992           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1993             {
1994               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1995             }
1996           if (NULL != socket->write_handle->write_cont)
1997             socket->write_handle->write_cont
1998               (socket->write_handle->write_cont_cls,
1999                socket->status,
2000                socket->write_handle->size);
2001           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2002                       "%x: Write completion callback completed\n",
2003                       socket->our_id);
2004           /* We are done with the write handle - Freeing it */
2005           GNUNET_free (socket->write_handle);
2006           socket->write_handle = NULL;
2007         }
2008       break;
2009     default:
2010       break;
2011     }
2012   return GNUNET_OK;
2013 }
2014
2015
2016 /**
2017  * Message Handler for mesh
2018  *
2019  * @param cls the 'struct GNUNET_STREAM_Socket'
2020  * @param tunnel connection to the other end
2021  * @param tunnel_ctx unused
2022  * @param sender who sent the message
2023  * @param message the actual message
2024  * @param atsi performance data for the connection
2025  * @return GNUNET_OK to keep the connection open,
2026  *         GNUNET_SYSERR to close it (signal serious error)
2027  */
2028 static int
2029 client_handle_ack (void *cls,
2030                    struct GNUNET_MESH_Tunnel *tunnel,
2031                    void **tunnel_ctx,
2032                    const struct GNUNET_PeerIdentity *sender,
2033                    const struct GNUNET_MessageHeader *message,
2034                    const struct GNUNET_ATS_Information*atsi)
2035 {
2036   struct GNUNET_STREAM_Socket *socket = cls;
2037   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2038  
2039   return handle_ack (socket, tunnel, sender, ack, atsi);
2040 }
2041
2042
2043 /**
2044  * Message Handler for mesh
2045  *
2046  * @param cls the server's listen socket
2047  * @param tunnel connection to the other end
2048  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2049  * @param sender who sent the message
2050  * @param message the actual message
2051  * @param atsi performance data for the connection
2052  * @return GNUNET_OK to keep the connection open,
2053  *         GNUNET_SYSERR to close it (signal serious error)
2054  */
2055 static int
2056 server_handle_ack (void *cls,
2057                    struct GNUNET_MESH_Tunnel *tunnel,
2058                    void **tunnel_ctx,
2059                    const struct GNUNET_PeerIdentity *sender,
2060                    const struct GNUNET_MessageHeader *message,
2061                    const struct GNUNET_ATS_Information*atsi)
2062 {
2063   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2064   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2065  
2066   return handle_ack (socket, tunnel, sender, ack, atsi);
2067 }
2068
2069
2070 /**
2071  * For client message handlers, the stream socket is in the
2072  * closure argument.
2073  */
2074 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2075   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2076   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2077    sizeof (struct GNUNET_STREAM_AckMessage) },
2078   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2079    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2080   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2081    sizeof (struct GNUNET_STREAM_MessageHeader)},
2082   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2083    sizeof (struct GNUNET_STREAM_MessageHeader)},
2084   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2085    sizeof (struct GNUNET_STREAM_MessageHeader)},
2086   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2087    sizeof (struct GNUNET_STREAM_MessageHeader)},
2088   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2089    sizeof (struct GNUNET_STREAM_MessageHeader)},
2090   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2091    sizeof (struct GNUNET_STREAM_MessageHeader)},
2092   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2093    sizeof (struct GNUNET_STREAM_MessageHeader)},
2094   {NULL, 0, 0}
2095 };
2096
2097
2098 /**
2099  * For server message handlers, the stream socket is in the
2100  * tunnel context, and the listen socket in the closure argument.
2101  */
2102 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2103   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2104   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2105    sizeof (struct GNUNET_STREAM_AckMessage) },
2106   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2107    sizeof (struct GNUNET_STREAM_MessageHeader)},
2108   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2109    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2110   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2111    sizeof (struct GNUNET_STREAM_MessageHeader)},
2112   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2113    sizeof (struct GNUNET_STREAM_MessageHeader)},
2114   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2115    sizeof (struct GNUNET_STREAM_MessageHeader)},
2116   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2117    sizeof (struct GNUNET_STREAM_MessageHeader)},
2118   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2119    sizeof (struct GNUNET_STREAM_MessageHeader)},
2120   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2121    sizeof (struct GNUNET_STREAM_MessageHeader)},
2122   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2123    sizeof (struct GNUNET_STREAM_MessageHeader)},
2124   {NULL, 0, 0}
2125 };
2126
2127
2128 /**
2129  * Function called when our target peer is connected to our tunnel
2130  *
2131  * @param cls the socket for which this tunnel is created
2132  * @param peer the peer identity of the target
2133  * @param atsi performance data for the connection
2134  */
2135 static void
2136 mesh_peer_connect_callback (void *cls,
2137                             const struct GNUNET_PeerIdentity *peer,
2138                             const struct GNUNET_ATS_Information * atsi)
2139 {
2140   struct GNUNET_STREAM_Socket *socket = cls;
2141   struct GNUNET_STREAM_MessageHeader *message;
2142   GNUNET_PEER_Id connected_peer;
2143
2144   connected_peer = GNUNET_PEER_search (peer);
2145   
2146   if (connected_peer != socket->other_peer)
2147     {
2148       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2149                   "%x: A peer which is not our target has connected",
2150                   "to our tunnel\n",
2151                   socket->our_id);
2152       return;
2153     }
2154   
2155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2156               "%x: Target peer %x connected\n", 
2157               socket->our_id,
2158               connected_peer);
2159   
2160   /* Set state to INIT */
2161   socket->state = STATE_INIT;
2162
2163   /* Send HELLO message */
2164   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2165   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2166   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2167   queue_message (socket,
2168                  message,
2169                  &set_state_hello_wait,
2170                  NULL);
2171
2172   /* Call open callback */
2173   if (NULL == socket->open_cb)
2174     {
2175       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2176                   "STREAM_open callback is NULL\n");
2177     }
2178 }
2179
2180
2181 /**
2182  * Function called when our target peer is disconnected from our tunnel
2183  *
2184  * @param cls the socket associated which this tunnel
2185  * @param peer the peer identity of the target
2186  */
2187 static void
2188 mesh_peer_disconnect_callback (void *cls,
2189                                const struct GNUNET_PeerIdentity *peer)
2190 {
2191
2192 }
2193
2194
2195 /**
2196  * Method called whenever a peer creates a tunnel to us
2197  *
2198  * @param cls closure
2199  * @param tunnel new handle to the tunnel
2200  * @param initiator peer that started the tunnel
2201  * @param atsi performance information for the tunnel
2202  * @return initial tunnel context for the tunnel
2203  *         (can be NULL -- that's not an error)
2204  */
2205 static void *
2206 new_tunnel_notify (void *cls,
2207                    struct GNUNET_MESH_Tunnel *tunnel,
2208                    const struct GNUNET_PeerIdentity *initiator,
2209                    const struct GNUNET_ATS_Information *atsi)
2210 {
2211   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2212   struct GNUNET_STREAM_Socket *socket;
2213
2214   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2215      from the same peer again until the socket is closed */
2216
2217   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2218   socket->other_peer = GNUNET_PEER_intern (initiator);
2219   socket->tunnel = tunnel;
2220   socket->session_id = 0;       /* FIXME */
2221   socket->state = STATE_INIT;
2222   socket->lsocket = lsocket;
2223   socket->our_id = lsocket->our_id;
2224   
2225   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226               "%x: Peer %x initiated tunnel to us\n", 
2227               socket->our_id,
2228               socket->other_peer);
2229   
2230   /* FIXME: Copy MESH handle from lsocket to socket */
2231   
2232   return socket;
2233 }
2234
2235
2236 /**
2237  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2238  * any associated state.  This function is NOT called if the client has
2239  * explicitly asked for the tunnel to be destroyed using
2240  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2241  * the tunnel.
2242  *
2243  * @param cls closure (set from GNUNET_MESH_connect)
2244  * @param tunnel connection to the other end (henceforth invalid)
2245  * @param tunnel_ctx place where local state associated
2246  *                   with the tunnel is stored
2247  */
2248 static void 
2249 tunnel_cleaner (void *cls,
2250                 const struct GNUNET_MESH_Tunnel *tunnel,
2251                 void *tunnel_ctx)
2252 {
2253   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2254   
2255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2256               "%x: Peer %x has terminated connection abruptly\n",
2257               socket->our_id,
2258               socket->other_peer);
2259
2260   socket->status = GNUNET_STREAM_SHUTDOWN;
2261
2262   /* Clear Transmit handles */
2263   if (NULL != socket->transmit_handle)
2264     {
2265       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2266       socket->transmit_handle = NULL;
2267     }
2268   socket->tunnel = NULL;
2269 }
2270
2271
2272 /*****************/
2273 /* API functions */
2274 /*****************/
2275
2276
2277 /**
2278  * Tries to open a stream to the target peer
2279  *
2280  * @param cfg configuration to use
2281  * @param target the target peer to which the stream has to be opened
2282  * @param app_port the application port number which uniquely identifies this
2283  *            stream
2284  * @param open_cb this function will be called after stream has be established 
2285  * @param open_cb_cls the closure for open_cb
2286  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2287  * @return if successful it returns the stream socket; NULL if stream cannot be
2288  *         opened 
2289  */
2290 struct GNUNET_STREAM_Socket *
2291 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2292                     const struct GNUNET_PeerIdentity *target,
2293                     GNUNET_MESH_ApplicationType app_port,
2294                     GNUNET_STREAM_OpenCallback open_cb,
2295                     void *open_cb_cls,
2296                     ...)
2297 {
2298   struct GNUNET_STREAM_Socket *socket;
2299   struct GNUNET_PeerIdentity own_peer_id;
2300   enum GNUNET_STREAM_Option option;
2301   va_list vargs;                /* Variable arguments */
2302
2303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2304               "%s\n", __func__);
2305
2306   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2307   socket->other_peer = GNUNET_PEER_intern (target);
2308   socket->open_cb = open_cb;
2309   socket->open_cls = open_cb_cls;
2310   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2311   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2312   
2313   /* Set defaults */
2314   socket->retransmit_timeout = 
2315     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2316
2317   va_start (vargs, open_cb_cls); /* Parse variable args */
2318   do {
2319     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2320     switch (option)
2321       {
2322       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2323         /* Expect struct GNUNET_TIME_Relative */
2324         socket->retransmit_timeout = va_arg (vargs,
2325                                              struct GNUNET_TIME_Relative);
2326         break;
2327       case GNUNET_STREAM_OPTION_END:
2328         break;
2329       }
2330   } while (GNUNET_STREAM_OPTION_END != option);
2331   va_end (vargs);               /* End of variable args parsing */
2332   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2333                                       10,  /* QUEUE size as parameter? */
2334                                       socket, /* cls */
2335                                       NULL, /* No inbound tunnel handler */
2336                                       &tunnel_cleaner, /* FIXME: not required? */
2337                                       client_message_handlers,
2338                                       &app_port); /* We don't get inbound tunnels */
2339   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2340     {
2341       GNUNET_free (socket);
2342       return NULL;
2343     }
2344
2345   /* Now create the mesh tunnel to target */
2346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2347               "Creating MESH Tunnel\n");
2348   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2349                                               NULL, /* Tunnel context */
2350                                               &mesh_peer_connect_callback,
2351                                               &mesh_peer_disconnect_callback,
2352                                               socket);
2353   GNUNET_assert (NULL != socket->tunnel);
2354   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2355                                         target);
2356   
2357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2358               "%s() END\n", __func__);
2359   return socket;
2360 }
2361
2362
2363 /**
2364  * Shutdown the stream for reading or writing (man 2 shutdown).
2365  *
2366  * @param socket the stream socket
2367  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2368  */
2369 void
2370 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2371                         int how)
2372 {
2373   return;
2374 }
2375
2376
2377 /**
2378  * Closes the stream
2379  *
2380  * @param socket the stream socket
2381  */
2382 void
2383 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2384 {
2385   struct MessageQueue *head;
2386
2387   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2388     {
2389       /* socket closed with read task pending!? */
2390       GNUNET_break (0);
2391       GNUNET_SCHEDULER_cancel (socket->read_task_id);
2392       socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2393     }
2394   
2395   /* Terminate the ack'ing tasks if they are still present */
2396   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2397     {
2398       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2399       socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2400     }
2401
2402   /* Clear Transmit handles */
2403   if (NULL != socket->transmit_handle)
2404     {
2405       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2406       socket->transmit_handle = NULL;
2407     }
2408
2409   /* Clear existing message queue */
2410   while (NULL != (head = socket->queue_head)) {
2411     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2412                                  socket->queue_tail,
2413                                  head);
2414     GNUNET_free (head->message);
2415     GNUNET_free (head);
2416   }
2417
2418   /* Close associated tunnel */
2419   if (NULL != socket->tunnel)
2420     {
2421       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2422       socket->tunnel = NULL;
2423     }
2424
2425   /* Close mesh connection */
2426   if (NULL != socket->mesh && NULL == socket->lsocket)
2427     {
2428       GNUNET_MESH_disconnect (socket->mesh);
2429       socket->mesh = NULL;
2430     }
2431   
2432   /* Release receive buffer */
2433   if (NULL != socket->receive_buffer)
2434     {
2435       GNUNET_free (socket->receive_buffer);
2436     }
2437
2438   GNUNET_free (socket);
2439 }
2440
2441
2442 /**
2443  * Listens for stream connections for a specific application ports
2444  *
2445  * @param cfg the configuration to use
2446  * @param app_port the application port for which new streams will be accepted
2447  * @param listen_cb this function will be called when a peer tries to establish
2448  *            a stream with us
2449  * @param listen_cb_cls closure for listen_cb
2450  * @return listen socket, NULL for any error
2451  */
2452 struct GNUNET_STREAM_ListenSocket *
2453 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2454                       GNUNET_MESH_ApplicationType app_port,
2455                       GNUNET_STREAM_ListenCallback listen_cb,
2456                       void *listen_cb_cls)
2457 {
2458   /* FIXME: Add variable args for passing configration options? */
2459   struct GNUNET_STREAM_ListenSocket *lsocket;
2460   struct GNUNET_PeerIdentity our_peer_id;
2461
2462   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2463   lsocket->port = app_port;
2464   lsocket->listen_cb = listen_cb;
2465   lsocket->listen_cb_cls = listen_cb_cls;
2466   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2467   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2468   lsocket->mesh = GNUNET_MESH_connect (cfg,
2469                                        10, /* FIXME: QUEUE size as parameter? */
2470                                        lsocket, /* Closure */
2471                                        &new_tunnel_notify,
2472                                        &tunnel_cleaner,
2473                                        server_message_handlers,
2474                                        &app_port);
2475   GNUNET_assert (NULL != lsocket->mesh);
2476   return lsocket;
2477 }
2478
2479
2480 /**
2481  * Closes the listen socket
2482  *
2483  * @param lsocket the listen socket
2484  */
2485 void
2486 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2487 {
2488   /* Close MESH connection */
2489   GNUNET_assert (NULL != lsocket->mesh);
2490   GNUNET_MESH_disconnect (lsocket->mesh);
2491   
2492   GNUNET_free (lsocket);
2493 }
2494
2495
2496 /**
2497  * Tries to write the given data to the stream
2498  *
2499  * @param socket the socket representing a stream
2500  * @param data the data buffer from where the data is written into the stream
2501  * @param size the number of bytes to be written from the data buffer
2502  * @param timeout the timeout period
2503  * @param write_cont the function to call upon writing some bytes into the stream
2504  * @param write_cont_cls the closure
2505  * @return handle to cancel the operation
2506  */
2507 struct GNUNET_STREAM_IOWriteHandle *
2508 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2509                      const void *data,
2510                      size_t size,
2511                      struct GNUNET_TIME_Relative timeout,
2512                      GNUNET_STREAM_CompletionContinuation write_cont,
2513                      void *write_cont_cls)
2514 {
2515   unsigned int num_needed_packets;
2516   unsigned int packet;
2517   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2518   uint32_t packet_size;
2519   uint32_t payload_size;
2520   struct GNUNET_STREAM_DataMessage *data_msg;
2521   const void *sweep;
2522   struct GNUNET_TIME_Relative ack_deadline;
2523
2524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2525               "%s\n", __func__);
2526
2527   /* Return NULL if there is already a write request pending */
2528   if (NULL != socket->write_handle)
2529   {
2530     GNUNET_break (0);
2531     return NULL;
2532   }
2533   if (!((STATE_ESTABLISHED == socket->state)
2534         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2535         || (STATE_RECEIVE_CLOSED == socket->state)))
2536     {
2537       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2538                   "%x: Attempting to write on a closed (OR) not-yet-established"
2539                   "stream\n",
2540                   socket->our_id);
2541       return NULL;
2542     } 
2543   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2544     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2545   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2546   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2547   io_handle->write_cont = write_cont;
2548   io_handle->write_cont_cls = write_cont_cls;
2549   io_handle->size = size;
2550   sweep = data;
2551   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2552      determined from RTT */
2553   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2554   /* Divide the given buffer into packets for sending */
2555   for (packet=0; packet < num_needed_packets; packet++)
2556     {
2557       if ((packet + 1) * max_payload_size < size) 
2558         {
2559           payload_size = max_payload_size;
2560           packet_size = MAX_PACKET_SIZE;
2561         }
2562       else 
2563         {
2564           payload_size = size - packet * max_payload_size;
2565           packet_size =  payload_size + sizeof (struct
2566                                                 GNUNET_STREAM_DataMessage); 
2567         }
2568       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2569       io_handle->messages[packet]->header.header.size = htons (packet_size);
2570       io_handle->messages[packet]->header.header.type =
2571         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2572       io_handle->messages[packet]->sequence_number =
2573         htonl (socket->write_sequence_number++);
2574       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2575
2576       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2577          determined from RTT */
2578       io_handle->messages[packet]->ack_deadline =
2579         GNUNET_TIME_relative_hton (ack_deadline);
2580       data_msg = io_handle->messages[packet];
2581       /* Copy data from given buffer to the packet */
2582       memcpy (&data_msg[1],
2583               sweep,
2584               payload_size);
2585       sweep += payload_size;
2586       socket->write_offset += payload_size;
2587     }
2588   socket->write_handle = io_handle;
2589   write_data (socket);
2590
2591   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2592               "%s() END\n", __func__);
2593
2594   return io_handle;
2595 }
2596
2597
2598 /**
2599  * Tries to read data from the stream
2600  *
2601  * @param socket the socket representing a stream
2602  * @param timeout the timeout period
2603  * @param proc function to call with data (once only)
2604  * @param proc_cls the closure for proc
2605  * @return handle to cancel the operation
2606  */
2607 struct GNUNET_STREAM_IOReadHandle *
2608 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2609                     struct GNUNET_TIME_Relative timeout,
2610                     GNUNET_STREAM_DataProcessor proc,
2611                     void *proc_cls)
2612 {
2613   struct GNUNET_STREAM_IOReadHandle *read_handle;
2614   
2615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2616               "%x: %s()\n", 
2617               socket->our_id,
2618               __func__);
2619
2620   /* Return NULL if there is already a read handle; the user has to cancel that
2621   first before continuing or has to wait until it is completed */
2622   if (NULL != socket->read_handle) return NULL;
2623
2624   GNUNET_assert (NULL != proc);
2625
2626   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2627   read_handle->proc = proc;
2628   read_handle->proc_cls = proc_cls;
2629   socket->read_handle = read_handle;
2630
2631   /* Check if we have a packet at bitmap 0 */
2632   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2633                                           0))
2634     {
2635       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2636                                                        socket);
2637    
2638     }
2639   
2640   /* Setup the read timeout task */
2641   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2642                                                                &read_io_timeout,
2643                                                                socket);
2644   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2645               "%x: %s() END\n",
2646               socket->our_id,
2647               __func__);
2648   return read_handle;
2649 }
2650
2651
2652 /**
2653  * Cancel pending write operation.
2654  *
2655  * @param ioh handle to operation to cancel
2656  */
2657 void
2658 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2659 {
2660   /* FIXME: Should cancel the write retransmission task */
2661   return;
2662 }
2663
2664
2665 /**
2666  * Cancel pending read operation.
2667  *
2668  * @param ioh handle to operation to cancel
2669  */
2670 void
2671 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2672 {
2673   return;
2674 }