-cleaner code
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_stream_lib.h"
42 #include "stream_protocol.h"
43
44 #define LOG(kind,...)                                   \
45   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
46
47 /**
48  * The maximum packet size of a stream packet
49  */
50 #define MAX_PACKET_SIZE 64000
51
52 /**
53  * Receive buffer
54  */
55 #define RECEIVE_BUFFER_SIZE 4096000
56
57 /**
58  * The maximum payload a data message packet can carry
59  */
60 static const size_t max_payload_size = 
61   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
62
63 /**
64  * states in the Protocol
65  */
66 enum State
67   {
68     /**
69      * Client initialization state
70      */
71     STATE_INIT,
72
73     /**
74      * Listener initialization state 
75      */
76     STATE_LISTEN,
77
78     /**
79      * Pre-connection establishment state
80      */
81     STATE_HELLO_WAIT,
82
83     /**
84      * State where a connection has been established
85      */
86     STATE_ESTABLISHED,
87
88     /**
89      * State where the socket is closed on our side and waiting to be ACK'ed
90      */
91     STATE_RECEIVE_CLOSE_WAIT,
92
93     /**
94      * State where the socket is closed for reading
95      */
96     STATE_RECEIVE_CLOSED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_TRANSMIT_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for writing
105      */
106     STATE_TRANSMIT_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed
115      */
116     STATE_CLOSED 
117   };
118
119
120 /**
121  * Functions of this type are called when a message is written
122  *
123  * @param cls the closure from queue_message
124  * @param socket the socket the written message was bound to
125  */
126 typedef void (*SendFinishCallback) (void *cls,
127                                     struct GNUNET_STREAM_Socket *socket);
128
129
130 /**
131  * The send message queue
132  */
133 struct MessageQueue
134 {
135   /**
136    * The message
137    */
138   struct GNUNET_STREAM_MessageHeader *message;
139
140   /**
141    * Callback to be called when the message is sent
142    */
143   SendFinishCallback finish_cb;
144
145   /**
146    * The closure for finish_cb
147    */
148   void *finish_cb_cls;
149
150   /**
151    * The next message in queue. Should be NULL in the last message
152    */
153   struct MessageQueue *next;
154
155   /**
156    * The next message in queue. Should be NULL in the first message
157    */
158   struct MessageQueue *prev;
159 };
160
161
162 /**
163  * The STREAM Socket Handler
164  */
165 struct GNUNET_STREAM_Socket
166 {
167   /**
168    * Retransmission timeout
169    */
170   struct GNUNET_TIME_Relative retransmit_timeout;
171
172   /**
173    * The Acknowledgement Bitmap
174    */
175   GNUNET_STREAM_AckBitmap ack_bitmap;
176
177   /**
178    * Time when the Acknowledgement was queued
179    */
180   struct GNUNET_TIME_Absolute ack_time_registered;
181
182   /**
183    * Queued Acknowledgement deadline
184    */
185   struct GNUNET_TIME_Relative ack_time_deadline;
186
187   /**
188    * The mesh handle
189    */
190   struct GNUNET_MESH_Handle *mesh;
191
192   /**
193    * The mesh tunnel handle
194    */
195   struct GNUNET_MESH_Tunnel *tunnel;
196
197   /**
198    * Stream open closure
199    */
200   void *open_cls;
201
202   /**
203    * Stream open callback
204    */
205   GNUNET_STREAM_OpenCallback open_cb;
206
207   /**
208    * The current transmit handle (if a pending transmit request exists)
209    */
210   struct GNUNET_MESH_TransmitHandle *transmit_handle;
211
212   /**
213    * The current act transmit handle (if a pending ack transmit request exists)
214    */
215   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
216
217   /**
218    * Pointer to the current ack message using in ack_task
219    */
220   struct GNUNET_STREAM_AckMessage *ack_msg;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for the read io timeout task
265    */
266   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
267
268   /**
269    * Task identifier for retransmission task after timeout
270    */
271   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * Task scheduled to continue a read operation.
280    */
281   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
282
283   /**
284    * The state of the protocol associated with this socket
285    */
286   enum State state;
287
288   /**
289    * The status of the socket
290    */
291   enum GNUNET_STREAM_Status status;
292
293   /**
294    * The number of previous timeouts; FIXME: currently not used
295    */
296   unsigned int retries;
297
298   /**
299    * The application port number (type: uint32_t)
300    */
301   GNUNET_MESH_ApplicationType app_port;
302
303   /**
304    * The session id associated with this stream connection
305    * FIXME: Not used currently, may be removed
306    */
307   uint32_t session_id;
308
309   /**
310    * Write sequence number. Set to random when sending HELLO(client) and
311    * HELLO_ACK(server) 
312    */
313   uint32_t write_sequence_number;
314
315   /**
316    * Read sequence number. This number's value is determined during handshake
317    */
318   uint32_t read_sequence_number;
319
320   /**
321    * The receiver buffer size
322    */
323   uint32_t receive_buffer_size;
324
325   /**
326    * The receiver buffer boundaries
327    */
328   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
329
330   /**
331    * receiver's available buffer after the last acknowledged packet
332    */
333   uint32_t receiver_window_available;
334
335   /**
336    * The offset pointer used during write operation
337    */
338   uint32_t write_offset;
339
340   /**
341    * The offset after which we are expecting data
342    */
343   uint32_t read_offset;
344
345   /**
346    * The offset upto which user has read from the received buffer
347    */
348   uint32_t copy_offset;
349 };
350
351
352 /**
353  * A socket for listening
354  */
355 struct GNUNET_STREAM_ListenSocket
356 {
357   /**
358    * The mesh handle
359    */
360   struct GNUNET_MESH_Handle *mesh;
361
362   /**
363    * The callback function which is called after successful opening socket
364    */
365   GNUNET_STREAM_ListenCallback listen_cb;
366
367   /**
368    * The call back closure
369    */
370   void *listen_cb_cls;
371
372   /**
373    * The service port
374    * FIXME: Remove if not required!
375    */
376   GNUNET_MESH_ApplicationType port;
377 };
378
379
380 /**
381  * The IO Write Handle
382  */
383 struct GNUNET_STREAM_IOWriteHandle
384 {
385   /**
386    * The socket to which this write handle is associated
387    */
388   struct GNUNET_STREAM_Socket *socket;
389
390   /**
391    * The packet_buffers associated with this Handle
392    */
393   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
394
395   /**
396    * The write continuation callback
397    */
398   GNUNET_STREAM_CompletionContinuation write_cont;
399
400   /**
401    * Write continuation closure
402    */
403   void *write_cont_cls;
404
405   /**
406    * The bitmap of this IOHandle; Corresponding bit for a message is set when
407    * it has been acknowledged by the receiver
408    */
409   GNUNET_STREAM_AckBitmap ack_bitmap;
410
411   /**
412    * Number of bytes in this write handle
413    */
414   size_t size;
415 };
416
417
418 /**
419  * The IO Read Handle
420  */
421 struct GNUNET_STREAM_IOReadHandle
422 {
423   /**
424    * Callback for the read processor
425    */
426   GNUNET_STREAM_DataProcessor proc;
427
428   /**
429    * The closure pointer for the read processor callback
430    */
431   void *proc_cls;
432 };
433
434
435 /**
436  * Handle for Shutdown
437  */
438 struct GNUNET_STREAM_ShutdownHandle
439 {
440   /**
441    * The socket associated with this shutdown handle
442    */
443   struct GNUNET_STREAM_Socket *socket;
444
445   /**
446    * Shutdown completion callback
447    */
448   GNUNET_STREAM_ShutdownCompletion completion_cb;
449
450   /**
451    * Closure for completion callback
452    */
453   void *completion_cls;
454
455   /**
456    * Close message retransmission task id
457    */
458   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
459
460   /**
461    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
462    */
463   int operation;  
464 };
465
466
467 /**
468  * Default value in seconds for various timeouts
469  */
470 static unsigned int default_timeout = 10;
471
472
473 /**
474  * Callback function for sending queued message
475  *
476  * @param cls closure the socket
477  * @param size number of bytes available in buf
478  * @param buf where the callee should write the message
479  * @return number of bytes written to buf
480  */
481 static size_t
482 send_message_notify (void *cls, size_t size, void *buf)
483 {
484   struct GNUNET_STREAM_Socket *socket = cls;
485   struct MessageQueue *head;
486   size_t ret;
487
488   socket->transmit_handle = NULL; /* Remove the transmit handle */
489   head = socket->queue_head;
490   if (NULL == head)
491     return 0; /* just to be safe */
492   if (0 == size)                /* request timed out */
493   {
494     socket->retries++;
495     LOG (GNUNET_ERROR_TYPE_DEBUG,
496          "%s: Message sending timed out. Retry %d \n",
497          GNUNET_i2s (&socket->other_peer),
498          socket->retries);
499     socket->transmit_handle = 
500       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
501                                          0, /* Corking */
502                                          1, /* Priority */
503                                          /* FIXME: exponential backoff */
504                                          socket->retransmit_timeout,
505                                          &socket->other_peer,
506                                          ntohs (head->message->header.size),
507                                          &send_message_notify,
508                                          socket);
509     return 0;
510   }
511
512   ret = ntohs (head->message->header.size);
513   GNUNET_assert (size >= ret);
514   memcpy (buf, head->message, ret);
515   if (NULL != head->finish_cb)
516   {
517     head->finish_cb (head->finish_cb_cls, socket);
518   }
519   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
520                                socket->queue_tail,
521                                head);
522   GNUNET_free (head->message);
523   GNUNET_free (head);
524   head = socket->queue_head;
525   if (NULL != head)    /* more pending messages to send */
526   {
527     socket->retries = 0;
528     socket->transmit_handle = 
529       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
530                                          0, /* Corking */
531                                          1, /* Priority */
532                                          /* FIXME: exponential backoff */
533                                          socket->retransmit_timeout,
534                                          &socket->other_peer,
535                                          ntohs (head->message->header.size),
536                                          &send_message_notify,
537                                          socket);
538   }
539   return ret;
540 }
541
542
543 /**
544  * Queues a message for sending using the mesh connection of a socket
545  *
546  * @param socket the socket whose mesh connection is used
547  * @param message the message to be sent
548  * @param finish_cb the callback to be called when the message is sent
549  * @param finish_cb_cls the closure for the callback
550  */
551 static void
552 queue_message (struct GNUNET_STREAM_Socket *socket,
553                struct GNUNET_STREAM_MessageHeader *message,
554                SendFinishCallback finish_cb,
555                void *finish_cb_cls)
556 {
557   struct MessageQueue *queue_entity;
558
559   GNUNET_assert 
560     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
561      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
562
563   LOG (GNUNET_ERROR_TYPE_DEBUG,
564        "%s: Queueing message of type %d and size %d\n",
565        GNUNET_i2s (&socket->other_peer),
566        ntohs (message->header.type),
567        ntohs (message->header.size));
568   GNUNET_assert (NULL != message);
569   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
570   queue_entity->message = message;
571   queue_entity->finish_cb = finish_cb;
572   queue_entity->finish_cb_cls = finish_cb_cls;
573   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
574                                     socket->queue_tail,
575                                     queue_entity);
576   if (NULL == socket->transmit_handle)
577   {
578     socket->retries = 0;
579     socket->transmit_handle = 
580       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
581                                          0, /* Corking */
582                                          1, /* Priority */
583                                          socket->retransmit_timeout,
584                                          &socket->other_peer,
585                                          ntohs (message->header.size),
586                                          &send_message_notify,
587                                          socket);
588   }
589 }
590
591
592 /**
593  * Copies a message and queues it for sending using the mesh connection of
594  * given socket 
595  *
596  * @param socket the socket whose mesh connection is used
597  * @param message the message to be sent
598  * @param finish_cb the callback to be called when the message is sent
599  * @param finish_cb_cls the closure for the callback
600  */
601 static void
602 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
603                         const struct GNUNET_STREAM_MessageHeader *message,
604                         SendFinishCallback finish_cb,
605                         void *finish_cb_cls)
606 {
607   struct GNUNET_STREAM_MessageHeader *msg_copy;
608   uint16_t size;
609   
610   size = ntohs (message->header.size);
611   msg_copy = GNUNET_malloc (size);
612   memcpy (msg_copy, message, size);
613   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
614 }
615
616
617 /**
618  * Callback function for sending ack message
619  *
620  * @param cls closure the ACK message created in ack_task
621  * @param size number of bytes available in buffer
622  * @param buf where the callee should write the message
623  * @return number of bytes written to buf
624  */
625 static size_t
626 send_ack_notify (void *cls, size_t size, void *buf)
627 {
628   struct GNUNET_STREAM_Socket *socket = cls;
629
630   if (0 == size)
631   {
632     LOG (GNUNET_ERROR_TYPE_DEBUG,
633          "%s called with size 0\n", __func__);
634     return 0;
635   }
636   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
637   
638   size = ntohs (socket->ack_msg->header.header.size);
639   memcpy (buf, socket->ack_msg, size);
640   
641   GNUNET_free (socket->ack_msg);
642   socket->ack_msg = NULL;
643   socket->ack_transmit_handle = NULL;
644   return size;
645 }
646
647 /**
648  * Writes data using the given socket. The amount of data written is limited by
649  * the receiver_window_size
650  *
651  * @param socket the socket to use
652  */
653 static void 
654 write_data (struct GNUNET_STREAM_Socket *socket);
655
656 /**
657  * Task for retransmitting data messages if they aren't ACK before their ack
658  * deadline 
659  *
660  * @param cls the socket
661  * @param tc the Task context
662  */
663 static void
664 retransmission_timeout_task (void *cls,
665                              const struct GNUNET_SCHEDULER_TaskContext *tc)
666 {
667   struct GNUNET_STREAM_Socket *socket = cls;
668   
669   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
670     return;
671
672   LOG (GNUNET_ERROR_TYPE_DEBUG,
673        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
674   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
675   write_data (socket);
676 }
677
678
679 /**
680  * Task for sending ACK message
681  *
682  * @param cls the socket
683  * @param tc the Task context
684  */
685 static void
686 ack_task (void *cls,
687           const struct GNUNET_SCHEDULER_TaskContext *tc)
688 {
689   struct GNUNET_STREAM_Socket *socket = cls;
690   struct GNUNET_STREAM_AckMessage *ack_msg;
691
692   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
693   {
694     return;
695   }
696   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
697   /* Create the ACK Message */
698   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
699   ack_msg->header.header.size = htons (sizeof (struct 
700                                                GNUNET_STREAM_AckMessage));
701   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
702   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
703   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
704   ack_msg->receive_window_remaining = 
705     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
706   socket->ack_msg = ack_msg;
707   /* Request MESH for sending ACK */
708   socket->ack_transmit_handle = 
709     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
710                                        0, /* Corking */
711                                        1, /* Priority */
712                                        socket->retransmit_timeout,
713                                        &socket->other_peer,
714                                        ntohs (ack_msg->header.header.size),
715                                        &send_ack_notify,
716                                        socket);
717 }
718
719
720 /**
721  * Retransmission task for shutdown messages
722  *
723  * @param cls the shutdown handle
724  * @param tc the Task Context
725  */
726 static void
727 close_msg_retransmission_task (void *cls,
728                                const struct GNUNET_SCHEDULER_TaskContext *tc)
729 {
730   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
731   struct GNUNET_STREAM_MessageHeader *msg;
732   struct GNUNET_STREAM_Socket *socket;
733
734   GNUNET_assert (NULL != shutdown_handle);
735   socket = shutdown_handle->socket;
736
737   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
738   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
739   switch (shutdown_handle->operation)
740   {
741   case SHUT_RDWR:
742     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
743     break;
744   case SHUT_RD:
745     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
746     break;
747   case SHUT_WR:
748     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
749     break;
750   default:
751     GNUNET_free (msg);
752     shutdown_handle->close_msg_retransmission_task_id = 
753       GNUNET_SCHEDULER_NO_TASK;
754     return;
755   }
756   queue_message (socket, msg, NULL, NULL);
757   shutdown_handle->close_msg_retransmission_task_id =
758     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
759                                   &close_msg_retransmission_task,
760                                   shutdown_handle);
761 }
762
763
764 /**
765  * Function to modify a bit in GNUNET_STREAM_AckBitmap
766  *
767  * @param bitmap the bitmap to modify
768  * @param bit the bit number to modify
769  * @param value GNUNET_YES to on, GNUNET_NO to off
770  */
771 static void
772 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
773                       unsigned int bit, 
774                       int value)
775 {
776   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
777   if (GNUNET_YES == value)
778     *bitmap |= (1LL << bit);
779   else
780     *bitmap &= ~(1LL << bit);
781 }
782
783
784 /**
785  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
786  *
787  * @param bitmap address of the bitmap that has to be checked
788  * @param bit the bit number to check
789  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
790  */
791 static uint8_t
792 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
793                       unsigned int bit)
794 {
795   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
796   return 0 != (*bitmap & (1LL << bit));
797 }
798
799
800 /**
801  * Writes data using the given socket. The amount of data written is limited by
802  * the receiver_window_size
803  *
804  * @param socket the socket to use
805  */
806 static void 
807 write_data (struct GNUNET_STREAM_Socket *socket)
808 {
809   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
810   int packet;                   /* Although an int, should never be negative */
811   int ack_packet;
812
813   ack_packet = -1;
814   /* Find the last acknowledged packet */
815   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
816   {      
817     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
818                                             packet))
819       ack_packet = packet;        
820     else if (NULL == io_handle->messages[packet])
821       break;
822   }
823   /* Resend packets which weren't ack'ed */
824   for (packet=0; packet < ack_packet; packet++)
825   {
826     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
827                                            packet))
828     {
829       LOG (GNUNET_ERROR_TYPE_DEBUG,
830            "%s: Placing DATA message with sequence %u in send queue\n",
831            GNUNET_i2s (&socket->other_peer),
832            ntohl (io_handle->messages[packet]->sequence_number));
833       copy_and_queue_message (socket,
834                               &io_handle->messages[packet]->header,
835                               NULL,
836                               NULL);
837     }
838   }
839   packet = ack_packet + 1;
840   /* Now send new packets if there is enough buffer space */
841   while ( (NULL != io_handle->messages[packet]) &&
842           (socket->receiver_window_available 
843            >= ntohs (io_handle->messages[packet]->header.header.size)) )
844   {
845     socket->receiver_window_available -= 
846       ntohs (io_handle->messages[packet]->header.header.size);
847     LOG (GNUNET_ERROR_TYPE_DEBUG,
848          "%s: Placing DATA message with sequence %u in send queue\n",
849          GNUNET_i2s (&socket->other_peer),
850          ntohl (io_handle->messages[packet]->sequence_number));
851     copy_and_queue_message (socket,
852                             &io_handle->messages[packet]->header,
853                             NULL,
854                             NULL);
855     packet++;
856   }
857   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
858     socket->retransmission_timeout_task_id = 
859       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
860                                     (GNUNET_TIME_UNIT_SECONDS, 8),
861                                     &retransmission_timeout_task,
862                                     socket);
863 }
864
865
866 /**
867  * Task for calling the read processor
868  *
869  * @param cls the socket
870  * @param tc the task context
871  */
872 static void
873 call_read_processor (void *cls,
874                      const struct GNUNET_SCHEDULER_TaskContext *tc)
875 {
876   struct GNUNET_STREAM_Socket *socket = cls;
877   size_t read_size;
878   size_t valid_read_size;
879   unsigned int packet;
880   uint32_t sequence_increase;
881   uint32_t offset_increase;
882
883   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
884   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
885     return;
886
887   if (NULL == socket->receive_buffer) 
888     return;
889
890   GNUNET_assert (NULL != socket->read_handle);
891   GNUNET_assert (NULL != socket->read_handle->proc);
892
893   /* Check the bitmap for any holes */
894   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
895   {
896     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
897                                            packet))
898       break;
899   }
900   /* We only call read processor if we have the first packet */
901   GNUNET_assert (0 < packet);
902   valid_read_size = 
903     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
904   GNUNET_assert (0 != valid_read_size);
905   /* Cancel the read_io_timeout_task */
906   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
907   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
908   /* Call the data processor */
909   LOG (GNUNET_ERROR_TYPE_DEBUG,
910        "%s: Calling read processor\n",
911        GNUNET_i2s (&socket->other_peer));
912   read_size = 
913     socket->read_handle->proc (socket->read_handle->proc_cls,
914                                socket->status,
915                                socket->receive_buffer + socket->copy_offset,
916                                valid_read_size);
917   LOG (GNUNET_ERROR_TYPE_DEBUG,
918        "%s: Read processor read %d bytes\n",
919        GNUNET_i2s (&socket->other_peer), read_size);
920   LOG (GNUNET_ERROR_TYPE_DEBUG,
921        "%s: Read processor completed successfully\n",
922        GNUNET_i2s (&socket->other_peer));
923   /* Free the read handle */
924   GNUNET_free (socket->read_handle);
925   socket->read_handle = NULL;
926   GNUNET_assert (read_size <= valid_read_size);
927   socket->copy_offset += read_size;
928   /* Determine upto which packet we can remove from the buffer */
929   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
930   {
931     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
932     { packet++; break; }
933     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
934       break;
935   }
936
937   /* If no packets can be removed we can't move the buffer */
938   if (0 == packet) return;
939   sequence_increase = packet;
940   LOG (GNUNET_ERROR_TYPE_DEBUG,
941        "%s: Sequence increase after read processor completion: %u\n",
942        GNUNET_i2s (&socket->other_peer), sequence_increase);
943
944   /* Shift the data in the receive buffer */
945   memmove (socket->receive_buffer,
946            socket->receive_buffer 
947            + socket->receive_buffer_boundaries[sequence_increase-1],
948            socket->receive_buffer_size
949            - socket->receive_buffer_boundaries[sequence_increase-1]);
950   /* Shift the bitmap */
951   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
952   /* Set read_sequence_number */
953   socket->read_sequence_number += sequence_increase;
954   /* Set read_offset */
955   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
956   socket->read_offset += offset_increase;
957   /* Fix copy_offset */
958   GNUNET_assert (offset_increase <= socket->copy_offset);
959   socket->copy_offset -= offset_increase;
960   /* Fix relative boundaries */
961   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
962   {
963     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
964     {
965       socket->receive_buffer_boundaries[packet] = 
966         socket->receive_buffer_boundaries[packet + sequence_increase] 
967         - offset_increase;
968     }
969     else
970       socket->receive_buffer_boundaries[packet] = 0;
971   }
972 }
973
974
975 /**
976  * Cancels the existing read io handle
977  *
978  * @param cls the closure from the SCHEDULER call
979  * @param tc the task context
980  */
981 static void
982 read_io_timeout (void *cls, 
983                  const struct GNUNET_SCHEDULER_TaskContext *tc)
984 {
985   struct GNUNET_STREAM_Socket *socket = cls;
986   GNUNET_STREAM_DataProcessor proc;
987   void *proc_cls;
988
989   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
990   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
991   {
992     LOG (GNUNET_ERROR_TYPE_DEBUG,
993          "%s: Read task timedout - Cancelling it\n",
994          GNUNET_i2s (&socket->other_peer));
995     GNUNET_SCHEDULER_cancel (socket->read_task_id);
996     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
997   }
998   GNUNET_assert (NULL != socket->read_handle);
999   proc = socket->read_handle->proc;
1000   proc_cls = socket->read_handle->proc_cls;
1001
1002   GNUNET_free (socket->read_handle);
1003   socket->read_handle = NULL;
1004   /* Call the read processor to signal timeout */
1005   proc (proc_cls,
1006         GNUNET_STREAM_TIMEOUT,
1007         NULL,
1008         0);
1009 }
1010
1011
1012 /**
1013  * Handler for DATA messages; Same for both client and server
1014  *
1015  * @param socket the socket through which the ack was received
1016  * @param tunnel connection to the other end
1017  * @param sender who sent the message
1018  * @param msg the data message
1019  * @param atsi performance data for the connection
1020  * @return GNUNET_OK to keep the connection open,
1021  *         GNUNET_SYSERR to close it (signal serious error)
1022  */
1023 static int
1024 handle_data (struct GNUNET_STREAM_Socket *socket,
1025              struct GNUNET_MESH_Tunnel *tunnel,
1026              const struct GNUNET_PeerIdentity *sender,
1027              const struct GNUNET_STREAM_DataMessage *msg,
1028              const struct GNUNET_ATS_Information*atsi)
1029 {
1030   const void *payload;
1031   uint32_t bytes_needed;
1032   uint32_t relative_offset;
1033   uint32_t relative_sequence_number;
1034   uint16_t size;
1035
1036   size = htons (msg->header.header.size);
1037   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1038   {
1039     GNUNET_break_op (0);
1040     return GNUNET_SYSERR;
1041   }
1042
1043   if (0 != memcmp (sender,
1044                    &socket->other_peer,
1045                    sizeof (struct GNUNET_PeerIdentity)))
1046   {
1047     LOG (GNUNET_ERROR_TYPE_DEBUG,
1048          "%s: Received DATA from non-confirming peer\n",
1049          GNUNET_i2s (&socket->other_peer));
1050     return GNUNET_YES;
1051   }
1052
1053   switch (socket->state)
1054   {
1055   case STATE_ESTABLISHED:
1056   case STATE_TRANSMIT_CLOSED:
1057   case STATE_TRANSMIT_CLOSE_WAIT:
1058       
1059     /* check if the message's sequence number is in the range we are
1060        expecting */
1061     relative_sequence_number = 
1062       ntohl (msg->sequence_number) - socket->read_sequence_number;
1063     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1064     {
1065       LOG (GNUNET_ERROR_TYPE_DEBUG,
1066            "%s: Ignoring received message with sequence number %u\n",
1067            GNUNET_i2s (&socket->other_peer),
1068            ntohl (msg->sequence_number));
1069       /* Start ACK sending task if one is not already present */
1070       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1071       {
1072         socket->ack_task_id = 
1073           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1074                                         (msg->ack_deadline),
1075                                         &ack_task,
1076                                         socket);
1077       }
1078       return GNUNET_YES;
1079     }
1080       
1081     /* Check if we have already seen this message */
1082     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1083                                             relative_sequence_number))
1084     {
1085       LOG (GNUNET_ERROR_TYPE_DEBUG,
1086            "%s: Ignoring already received message with sequence number %u\n",
1087            GNUNET_i2s (&socket->other_peer),
1088            ntohl (msg->sequence_number));
1089       /* Start ACK sending task if one is not already present */
1090       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1091       {
1092         socket->ack_task_id = 
1093           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1094                                         (msg->ack_deadline),
1095                                         &ack_task,
1096                                         socket);
1097       }
1098       return GNUNET_YES;
1099     }
1100
1101     LOG (GNUNET_ERROR_TYPE_DEBUG,
1102          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1103          GNUNET_i2s (&socket->other_peer),
1104          ntohl (msg->sequence_number),
1105          ntohs (msg->header.header.size),
1106          GNUNET_i2s (&socket->other_peer));
1107       
1108     /* Check if we have to allocate the buffer */
1109     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1110     relative_offset = ntohl (msg->offset) - socket->read_offset;
1111     bytes_needed = relative_offset + size;
1112     if (bytes_needed > socket->receive_buffer_size)
1113     {
1114       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1115       {
1116         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1117                                                  bytes_needed);
1118         socket->receive_buffer_size = bytes_needed;
1119       }
1120       else
1121       {
1122         LOG (GNUNET_ERROR_TYPE_DEBUG,
1123              "%s: Cannot accommodate packet %d as buffer is full\n",
1124              GNUNET_i2s (&socket->other_peer),
1125              ntohl (msg->sequence_number));
1126         return GNUNET_YES;
1127       }
1128     }
1129       
1130     /* Copy Data to buffer */
1131     payload = &msg[1];
1132     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1133     memcpy (socket->receive_buffer + relative_offset,
1134             payload,
1135             size);
1136     socket->receive_buffer_boundaries[relative_sequence_number] = 
1137       relative_offset + size;
1138       
1139     /* Modify the ACK bitmap */
1140     ackbitmap_modify_bit (&socket->ack_bitmap,
1141                           relative_sequence_number,
1142                           GNUNET_YES);
1143
1144     /* Start ACK sending task if one is not already present */
1145     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1146     {
1147       socket->ack_task_id = 
1148         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1149                                       (msg->ack_deadline),
1150                                       &ack_task,
1151                                       socket);
1152     }
1153
1154     if ((NULL != socket->read_handle) /* A read handle is waiting */
1155         /* There is no current read task */
1156         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1157         /* We have the first packet */
1158         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1159                                                0)))
1160     {
1161       LOG (GNUNET_ERROR_TYPE_DEBUG,
1162            "%s: Scheduling read processor\n",
1163            GNUNET_i2s (&socket->other_peer));
1164           
1165       socket->read_task_id = 
1166         GNUNET_SCHEDULER_add_now (&call_read_processor,
1167                                   socket);
1168     }
1169       
1170     break;
1171
1172   default:
1173     LOG (GNUNET_ERROR_TYPE_DEBUG,
1174          "%s: Received data message when it cannot be handled\n",
1175          GNUNET_i2s (&socket->other_peer));
1176     break;
1177   }
1178   return GNUNET_YES;
1179 }
1180
1181
1182 /**
1183  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1184  *
1185  * @param cls the socket (set from GNUNET_MESH_connect)
1186  * @param tunnel connection to the other end
1187  * @param tunnel_ctx place to store local state associated with the tunnel
1188  * @param sender who sent the message
1189  * @param message the actual message
1190  * @param atsi performance data for the connection
1191  * @return GNUNET_OK to keep the connection open,
1192  *         GNUNET_SYSERR to close it (signal serious error)
1193  */
1194 static int
1195 client_handle_data (void *cls,
1196                     struct GNUNET_MESH_Tunnel *tunnel,
1197                     void **tunnel_ctx,
1198                     const struct GNUNET_PeerIdentity *sender,
1199                     const struct GNUNET_MessageHeader *message,
1200                     const struct GNUNET_ATS_Information*atsi)
1201 {
1202   struct GNUNET_STREAM_Socket *socket = cls;
1203
1204   return handle_data (socket, 
1205                       tunnel, 
1206                       sender, 
1207                       (const struct GNUNET_STREAM_DataMessage *) message, 
1208                       atsi);
1209 }
1210
1211
1212 /**
1213  * Callback to set state to ESTABLISHED
1214  *
1215  * @param cls the closure from queue_message FIXME: document
1216  * @param socket the socket to requiring state change
1217  */
1218 static void
1219 set_state_established (void *cls,
1220                        struct GNUNET_STREAM_Socket *socket)
1221 {
1222   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1223        "%s: Attaining ESTABLISHED state\n",
1224        GNUNET_i2s (&socket->other_peer));
1225   socket->write_offset = 0;
1226   socket->read_offset = 0;
1227   socket->state = STATE_ESTABLISHED;
1228   /* FIXME: What if listen_cb is NULL */
1229   if (NULL != socket->lsocket)
1230   {
1231     LOG (GNUNET_ERROR_TYPE_DEBUG,
1232          "%s: Calling listen callback\n",
1233          GNUNET_i2s (&socket->other_peer));
1234     if (GNUNET_SYSERR == 
1235         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1236                                     socket,
1237                                     &socket->other_peer))
1238     {
1239       socket->state = STATE_CLOSED;
1240       /* FIXME: We should close in a decent way */
1241       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1242       GNUNET_free (socket);
1243     }
1244   }
1245   else if (socket->open_cb)
1246     socket->open_cb (socket->open_cls, socket);
1247 }
1248
1249
1250 /**
1251  * Callback to set state to HELLO_WAIT
1252  *
1253  * @param cls the closure from queue_message
1254  * @param socket the socket to requiring state change
1255  */
1256 static void
1257 set_state_hello_wait (void *cls,
1258                       struct GNUNET_STREAM_Socket *socket)
1259 {
1260   GNUNET_assert (STATE_INIT == socket->state);
1261   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1262        "%s: Attaining HELLO_WAIT state\n",
1263        GNUNET_i2s (&socket->other_peer));
1264   socket->state = STATE_HELLO_WAIT;
1265 }
1266
1267
1268 /**
1269  * Callback to set state to CLOSE_WAIT
1270  *
1271  * @param cls the closure from queue_message
1272  * @param socket the socket requiring state change
1273  */
1274 static void
1275 set_state_close_wait (void *cls,
1276                       struct GNUNET_STREAM_Socket *socket)
1277 {
1278   LOG (GNUNET_ERROR_TYPE_DEBUG,
1279        "%s: Attaing CLOSE_WAIT state\n",
1280        GNUNET_i2s (&socket->other_peer));
1281   socket->state = STATE_CLOSE_WAIT;
1282   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1283   socket->receive_buffer = NULL;
1284   socket->receive_buffer_size = 0;
1285 }
1286
1287
1288 /**
1289  * Callback to set state to RECEIVE_CLOSE_WAIT
1290  *
1291  * @param cls the closure from queue_message
1292  * @param socket the socket requiring state change
1293  */
1294 static void
1295 set_state_receive_close_wait (void *cls,
1296                               struct GNUNET_STREAM_Socket *socket)
1297 {
1298   LOG (GNUNET_ERROR_TYPE_DEBUG,
1299        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1300        GNUNET_i2s (&socket->other_peer));
1301   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1302   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1303   socket->receive_buffer = NULL;
1304   socket->receive_buffer_size = 0;
1305 }
1306
1307
1308 /**
1309  * Callback to set state to TRANSMIT_CLOSE_WAIT
1310  *
1311  * @param cls the closure from queue_message
1312  * @param socket the socket requiring state change
1313  */
1314 static void
1315 set_state_transmit_close_wait (void *cls,
1316                                struct GNUNET_STREAM_Socket *socket)
1317 {
1318   LOG (GNUNET_ERROR_TYPE_DEBUG,
1319        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1320        GNUNET_i2s (&socket->other_peer));
1321   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1322 }
1323
1324
1325 /**
1326  * Callback to set state to CLOSED
1327  *
1328  * @param cls the closure from queue_message
1329  * @param socket the socket requiring state change
1330  */
1331 static void
1332 set_state_closed (void *cls,
1333                   struct GNUNET_STREAM_Socket *socket)
1334 {
1335   socket->state = STATE_CLOSED;
1336 }
1337
1338 /**
1339  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1340  * socket
1341  *
1342  * @param socket the socket for which this HelloAckMessage has to be generated
1343  * @return the HelloAckMessage
1344  */
1345 static struct GNUNET_STREAM_HelloAckMessage *
1346 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1347 {
1348   struct GNUNET_STREAM_HelloAckMessage *msg;
1349
1350   /* Get the random sequence number */
1351   socket->write_sequence_number = 
1352     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1353   LOG (GNUNET_ERROR_TYPE_DEBUG,
1354        "%s: Generated write sequence number %u\n",
1355        GNUNET_i2s (&socket->other_peer),
1356        (unsigned int) socket->write_sequence_number);
1357   
1358   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1359   msg->header.header.size = 
1360     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1361   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1362   msg->sequence_number = htonl (socket->write_sequence_number);
1363   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1364
1365   return msg;
1366 }
1367
1368
1369 /**
1370  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1371  *
1372  * @param cls the socket (set from GNUNET_MESH_connect)
1373  * @param tunnel connection to the other end
1374  * @param tunnel_ctx this is NULL
1375  * @param sender who sent the message
1376  * @param message the actual message
1377  * @param atsi performance data for the connection
1378  * @return GNUNET_OK to keep the connection open,
1379  *         GNUNET_SYSERR to close it (signal serious error)
1380  */
1381 static int
1382 client_handle_hello_ack (void *cls,
1383                          struct GNUNET_MESH_Tunnel *tunnel,
1384                          void **tunnel_ctx,
1385                          const struct GNUNET_PeerIdentity *sender,
1386                          const struct GNUNET_MessageHeader *message,
1387                          const struct GNUNET_ATS_Information*atsi)
1388 {
1389   struct GNUNET_STREAM_Socket *socket = cls;
1390   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1391   struct GNUNET_STREAM_HelloAckMessage *reply;
1392
1393   if (0 != memcmp (sender,
1394                    &socket->other_peer,
1395                    sizeof (struct GNUNET_PeerIdentity)))
1396   {
1397     LOG (GNUNET_ERROR_TYPE_DEBUG,
1398          "%s: Received HELLO_ACK from non-confirming peer\n",
1399          GNUNET_i2s (&socket->other_peer));
1400     return GNUNET_YES;
1401   }
1402   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1403   LOG (GNUNET_ERROR_TYPE_DEBUG,
1404        "%s: Received HELLO_ACK from %s\n",
1405        GNUNET_i2s (&socket->other_peer),
1406        GNUNET_i2s (&socket->other_peer));
1407
1408   GNUNET_assert (socket->tunnel == tunnel);
1409   switch (socket->state)
1410   {
1411   case STATE_HELLO_WAIT:
1412     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1413     LOG (GNUNET_ERROR_TYPE_DEBUG,
1414          "%s: Read sequence number %u\n",
1415          GNUNET_i2s (&socket->other_peer),
1416          (unsigned int) socket->read_sequence_number);
1417     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1418     reply = generate_hello_ack_msg (socket);
1419     queue_message (socket,
1420                    &reply->header, 
1421                    &set_state_established, 
1422                    NULL);      
1423     return GNUNET_OK;
1424   case STATE_ESTABLISHED:
1425   case STATE_RECEIVE_CLOSE_WAIT:
1426     // call statistics (# ACKs ignored++)
1427     return GNUNET_OK;
1428   case STATE_INIT:
1429   default:
1430     LOG (GNUNET_ERROR_TYPE_DEBUG,
1431          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1432          GNUNET_i2s (&socket->other_peer),
1433          GNUNET_i2s (&socket->other_peer),
1434          socket->state);
1435     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1436     return GNUNET_SYSERR;
1437   }
1438
1439 }
1440
1441
1442 /**
1443  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1444  *
1445  * @param cls the socket (set from GNUNET_MESH_connect)
1446  * @param tunnel connection to the other end
1447  * @param tunnel_ctx this is NULL
1448  * @param sender who sent the message
1449  * @param message the actual message
1450  * @param atsi performance data for the connection
1451  * @return GNUNET_OK to keep the connection open,
1452  *         GNUNET_SYSERR to close it (signal serious error)
1453  */
1454 static int
1455 client_handle_reset (void *cls,
1456                      struct GNUNET_MESH_Tunnel *tunnel,
1457                      void **tunnel_ctx,
1458                      const struct GNUNET_PeerIdentity *sender,
1459                      const struct GNUNET_MessageHeader *message,
1460                      const struct GNUNET_ATS_Information*atsi)
1461 {
1462   // struct GNUNET_STREAM_Socket *socket = cls;
1463
1464   return GNUNET_OK;
1465 }
1466
1467
1468 /**
1469  * Common message handler for handling TRANSMIT_CLOSE messages
1470  *
1471  * @param socket the socket through which the ack was received
1472  * @param tunnel connection to the other end
1473  * @param sender who sent the message
1474  * @param msg the transmit close message
1475  * @param atsi performance data for the connection
1476  * @return GNUNET_OK to keep the connection open,
1477  *         GNUNET_SYSERR to close it (signal serious error)
1478  */
1479 static int
1480 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1481                        struct GNUNET_MESH_Tunnel *tunnel,
1482                        const struct GNUNET_PeerIdentity *sender,
1483                        const struct GNUNET_STREAM_MessageHeader *msg,
1484                        const struct GNUNET_ATS_Information*atsi)
1485 {
1486   struct GNUNET_STREAM_MessageHeader *reply;
1487
1488   switch (socket->state)
1489   {
1490   case STATE_ESTABLISHED:
1491     socket->state = STATE_RECEIVE_CLOSED;
1492
1493     /* Send TRANSMIT_CLOSE_ACK */
1494     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1495     reply->header.type = 
1496       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1497     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1498     queue_message (socket, reply, NULL, NULL);
1499     break;
1500
1501   default:
1502     /* FIXME: Call statistics? */
1503     break;
1504   }
1505   return GNUNET_YES;
1506 }
1507
1508
1509 /**
1510  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1511  *
1512  * @param cls the socket (set from GNUNET_MESH_connect)
1513  * @param tunnel connection to the other end
1514  * @param tunnel_ctx this is NULL
1515  * @param sender who sent the message
1516  * @param message the actual message
1517  * @param atsi performance data for the connection
1518  * @return GNUNET_OK to keep the connection open,
1519  *         GNUNET_SYSERR to close it (signal serious error)
1520  */
1521 static int
1522 client_handle_transmit_close (void *cls,
1523                               struct GNUNET_MESH_Tunnel *tunnel,
1524                               void **tunnel_ctx,
1525                               const struct GNUNET_PeerIdentity *sender,
1526                               const struct GNUNET_MessageHeader *message,
1527                               const struct GNUNET_ATS_Information*atsi)
1528 {
1529   struct GNUNET_STREAM_Socket *socket = cls;
1530   
1531   return handle_transmit_close (socket,
1532                                 tunnel,
1533                                 sender,
1534                                 (struct GNUNET_STREAM_MessageHeader *)message,
1535                                 atsi);
1536 }
1537
1538
1539 /**
1540  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1541  *
1542  * @param socket the socket
1543  * @param tunnel connection to the other end
1544  * @param sender who sent the message
1545  * @param message the actual message
1546  * @param atsi performance data for the connection
1547  * @param operation the close operation which is being ACK'ed
1548  * @return GNUNET_OK to keep the connection open,
1549  *         GNUNET_SYSERR to close it (signal serious error)
1550  */
1551 static int
1552 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1553                           struct GNUNET_MESH_Tunnel *tunnel,
1554                           const struct GNUNET_PeerIdentity *sender,
1555                           const struct GNUNET_STREAM_MessageHeader *message,
1556                           const struct GNUNET_ATS_Information *atsi,
1557                           int operation)
1558 {
1559   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1560
1561   shutdown_handle = socket->shutdown_handle;
1562   if (NULL == shutdown_handle)
1563   {
1564     LOG (GNUNET_ERROR_TYPE_DEBUG,
1565          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1566          GNUNET_i2s (&socket->other_peer));
1567     return GNUNET_OK;
1568   }
1569
1570   switch (operation)
1571   {
1572   case SHUT_RDWR:
1573     switch (socket->state)
1574     {
1575     case STATE_CLOSE_WAIT:
1576       if (SHUT_RDWR != shutdown_handle->operation)
1577       {
1578         LOG (GNUNET_ERROR_TYPE_DEBUG,
1579              "%s: Received CLOSE_ACK when shutdown handle is not for "
1580              "SHUT_RDWR\n",
1581              GNUNET_i2s (&socket->other_peer));
1582         return GNUNET_OK;
1583       }
1584
1585       LOG (GNUNET_ERROR_TYPE_DEBUG,
1586            "%s: Received CLOSE_ACK from %s\n",
1587            GNUNET_i2s (&socket->other_peer),
1588            GNUNET_i2s (&socket->other_peer));
1589       socket->state = STATE_CLOSED;
1590       break;
1591     default:
1592       LOG (GNUNET_ERROR_TYPE_DEBUG,
1593            "%s: Received CLOSE_ACK when in it not expected\n",
1594            GNUNET_i2s (&socket->other_peer));
1595       return GNUNET_OK;
1596     }
1597     break;
1598
1599   case SHUT_RD:
1600     switch (socket->state)
1601     {
1602     case STATE_RECEIVE_CLOSE_WAIT:
1603       if (SHUT_RD != shutdown_handle->operation)
1604       {
1605         LOG (GNUNET_ERROR_TYPE_DEBUG,
1606              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1607              "is not for SHUT_RD\n",
1608              GNUNET_i2s (&socket->other_peer));
1609         return GNUNET_OK;
1610       }
1611
1612       LOG (GNUNET_ERROR_TYPE_DEBUG,
1613            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1614            GNUNET_i2s (&socket->other_peer),
1615            GNUNET_i2s (&socket->other_peer));
1616       socket->state = STATE_RECEIVE_CLOSED;
1617       break;
1618     default:
1619       LOG (GNUNET_ERROR_TYPE_DEBUG,
1620            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1621            GNUNET_i2s (&socket->other_peer));
1622       return GNUNET_OK;
1623     }
1624
1625     break;
1626   case SHUT_WR:
1627     switch (socket->state)
1628     {
1629     case STATE_TRANSMIT_CLOSE_WAIT:
1630       if (SHUT_WR != shutdown_handle->operation)
1631       {
1632         LOG (GNUNET_ERROR_TYPE_DEBUG,
1633              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1634              "is not for SHUT_WR\n",
1635              GNUNET_i2s (&socket->other_peer));
1636         return GNUNET_OK;
1637       }
1638
1639       LOG (GNUNET_ERROR_TYPE_DEBUG,
1640            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1641            GNUNET_i2s (&socket->other_peer),
1642            GNUNET_i2s (&socket->other_peer));
1643       socket->state = STATE_TRANSMIT_CLOSED;
1644       break;
1645     default:
1646       LOG (GNUNET_ERROR_TYPE_DEBUG,
1647            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1648            GNUNET_i2s (&socket->other_peer));
1649           
1650       return GNUNET_OK;
1651     }
1652     break;
1653   default:
1654     GNUNET_assert (0);
1655   }
1656
1657   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1658     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1659                                    operation);
1660   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1661   socket->shutdown_handle = NULL;
1662   if (GNUNET_SCHEDULER_NO_TASK
1663       != shutdown_handle->close_msg_retransmission_task_id)
1664   {
1665     GNUNET_SCHEDULER_cancel
1666       (shutdown_handle->close_msg_retransmission_task_id);
1667     shutdown_handle->close_msg_retransmission_task_id =
1668       GNUNET_SCHEDULER_NO_TASK;
1669   }
1670   return GNUNET_OK;
1671 }
1672
1673
1674 /**
1675  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1676  *
1677  * @param cls the socket (set from GNUNET_MESH_connect)
1678  * @param tunnel connection to the other end
1679  * @param tunnel_ctx this is NULL
1680  * @param sender who sent the message
1681  * @param message the actual message
1682  * @param atsi performance data for the connection
1683  * @return GNUNET_OK to keep the connection open,
1684  *         GNUNET_SYSERR to close it (signal serious error)
1685  */
1686 static int
1687 client_handle_transmit_close_ack (void *cls,
1688                                   struct GNUNET_MESH_Tunnel *tunnel,
1689                                   void **tunnel_ctx,
1690                                   const struct GNUNET_PeerIdentity *sender,
1691                                   const struct GNUNET_MessageHeader *message,
1692                                   const struct GNUNET_ATS_Information*atsi)
1693 {
1694   struct GNUNET_STREAM_Socket *socket = cls;
1695
1696   return handle_generic_close_ack (socket,
1697                                    tunnel,
1698                                    sender,
1699                                    (const struct GNUNET_STREAM_MessageHeader *)
1700                                    message,
1701                                    atsi,
1702                                    SHUT_WR);
1703 }
1704
1705
1706 /**
1707  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1708  *
1709  * @param socket the socket
1710  * @param tunnel connection to the other end
1711  * @param sender who sent the message
1712  * @param message the actual message
1713  * @param atsi performance data for the connection
1714  * @return GNUNET_OK to keep the connection open,
1715  *         GNUNET_SYSERR to close it (signal serious error)
1716  */
1717 static int
1718 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1719                       struct GNUNET_MESH_Tunnel *tunnel,
1720                       const struct GNUNET_PeerIdentity *sender,
1721                       const struct GNUNET_STREAM_MessageHeader *message,
1722                       const struct GNUNET_ATS_Information *atsi)
1723 {
1724   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1725
1726   switch (socket->state)
1727   {
1728   case STATE_INIT:
1729   case STATE_LISTEN:
1730   case STATE_HELLO_WAIT:
1731     LOG (GNUNET_ERROR_TYPE_DEBUG,
1732          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1733          GNUNET_i2s (&socket->other_peer));
1734     return GNUNET_OK;
1735   default:
1736     break;
1737   }
1738   
1739   LOG (GNUNET_ERROR_TYPE_DEBUG,
1740        "%s: Received RECEIVE_CLOSE from %s\n",
1741        GNUNET_i2s (&socket->other_peer),
1742        GNUNET_i2s (&socket->other_peer));
1743   receive_close_ack =
1744     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1745   receive_close_ack->header.size =
1746     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1747   receive_close_ack->header.type =
1748     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1749   queue_message (socket,
1750                  receive_close_ack,
1751                  &set_state_closed,
1752                  NULL);
1753   
1754   /* FIXME: Handle the case where write handle is present; the write operation
1755      should be deemed as finised and the write continuation callback
1756      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1757   return GNUNET_OK;
1758 }
1759
1760
1761 /**
1762  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1763  *
1764  * @param cls the socket (set from GNUNET_MESH_connect)
1765  * @param tunnel connection to the other end
1766  * @param tunnel_ctx this is NULL
1767  * @param sender who sent the message
1768  * @param message the actual message
1769  * @param atsi performance data for the connection
1770  * @return GNUNET_OK to keep the connection open,
1771  *         GNUNET_SYSERR to close it (signal serious error)
1772  */
1773 static int
1774 client_handle_receive_close (void *cls,
1775                              struct GNUNET_MESH_Tunnel *tunnel,
1776                              void **tunnel_ctx,
1777                              const struct GNUNET_PeerIdentity *sender,
1778                              const struct GNUNET_MessageHeader *message,
1779                              const struct GNUNET_ATS_Information*atsi)
1780 {
1781   struct GNUNET_STREAM_Socket *socket = cls;
1782
1783   return
1784     handle_receive_close (socket,
1785                           tunnel,
1786                           sender,
1787                           (const struct GNUNET_STREAM_MessageHeader *) message,
1788                           atsi);
1789 }
1790
1791
1792 /**
1793  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1794  *
1795  * @param cls the socket (set from GNUNET_MESH_connect)
1796  * @param tunnel connection to the other end
1797  * @param tunnel_ctx this is NULL
1798  * @param sender who sent the message
1799  * @param message the actual message
1800  * @param atsi performance data for the connection
1801  * @return GNUNET_OK to keep the connection open,
1802  *         GNUNET_SYSERR to close it (signal serious error)
1803  */
1804 static int
1805 client_handle_receive_close_ack (void *cls,
1806                                  struct GNUNET_MESH_Tunnel *tunnel,
1807                                  void **tunnel_ctx,
1808                                  const struct GNUNET_PeerIdentity *sender,
1809                                  const struct GNUNET_MessageHeader *message,
1810                                  const struct GNUNET_ATS_Information*atsi)
1811 {
1812   struct GNUNET_STREAM_Socket *socket = cls;
1813
1814   return handle_generic_close_ack (socket,
1815                                    tunnel,
1816                                    sender,
1817                                    (const struct GNUNET_STREAM_MessageHeader *)
1818                                    message,
1819                                    atsi,
1820                                    SHUT_RD);
1821 }
1822
1823
1824 /**
1825  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1826  *
1827  * @param socket the socket
1828  * @param tunnel connection to the other end
1829  * @param sender who sent the message
1830  * @param message the actual message
1831  * @param atsi performance data for the connection
1832  * @return GNUNET_OK to keep the connection open,
1833  *         GNUNET_SYSERR to close it (signal serious error)
1834  */
1835 static int
1836 handle_close (struct GNUNET_STREAM_Socket *socket,
1837               struct GNUNET_MESH_Tunnel *tunnel,
1838               const struct GNUNET_PeerIdentity *sender,
1839               const struct GNUNET_STREAM_MessageHeader *message,
1840               const struct GNUNET_ATS_Information*atsi)
1841 {
1842   struct GNUNET_STREAM_MessageHeader *close_ack;
1843
1844   switch (socket->state)
1845   {
1846   case STATE_INIT:
1847   case STATE_LISTEN:
1848   case STATE_HELLO_WAIT:
1849     LOG (GNUNET_ERROR_TYPE_DEBUG,
1850          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1851          GNUNET_i2s (&socket->other_peer));
1852     return GNUNET_OK;
1853   default:
1854     break;
1855   }
1856
1857   LOG (GNUNET_ERROR_TYPE_DEBUG,
1858        "%s: Received CLOSE from %s\n",
1859        GNUNET_i2s (&socket->other_peer),
1860        GNUNET_i2s (&socket->other_peer));
1861   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1862   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1863   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1864   queue_message (socket,
1865                  close_ack,
1866                  &set_state_closed,
1867                  NULL);
1868   if (socket->state == STATE_CLOSED)
1869     return GNUNET_OK;
1870
1871   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1872   socket->receive_buffer = NULL;
1873   socket->receive_buffer_size = 0;
1874   return GNUNET_OK;
1875 }
1876
1877
1878 /**
1879  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1880  *
1881  * @param cls the socket (set from GNUNET_MESH_connect)
1882  * @param tunnel connection to the other end
1883  * @param tunnel_ctx this is NULL
1884  * @param sender who sent the message
1885  * @param message the actual message
1886  * @param atsi performance data for the connection
1887  * @return GNUNET_OK to keep the connection open,
1888  *         GNUNET_SYSERR to close it (signal serious error)
1889  */
1890 static int
1891 client_handle_close (void *cls,
1892                      struct GNUNET_MESH_Tunnel *tunnel,
1893                      void **tunnel_ctx,
1894                      const struct GNUNET_PeerIdentity *sender,
1895                      const struct GNUNET_MessageHeader *message,
1896                      const struct GNUNET_ATS_Information*atsi)
1897 {
1898   struct GNUNET_STREAM_Socket *socket = cls;
1899
1900   return handle_close (socket,
1901                        tunnel,
1902                        sender,
1903                        (const struct GNUNET_STREAM_MessageHeader *) message,
1904                        atsi);
1905 }
1906
1907
1908 /**
1909  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1910  *
1911  * @param cls the socket (set from GNUNET_MESH_connect)
1912  * @param tunnel connection to the other end
1913  * @param tunnel_ctx this is NULL
1914  * @param sender who sent the message
1915  * @param message the actual message
1916  * @param atsi performance data for the connection
1917  * @return GNUNET_OK to keep the connection open,
1918  *         GNUNET_SYSERR to close it (signal serious error)
1919  */
1920 static int
1921 client_handle_close_ack (void *cls,
1922                          struct GNUNET_MESH_Tunnel *tunnel,
1923                          void **tunnel_ctx,
1924                          const struct GNUNET_PeerIdentity *sender,
1925                          const struct GNUNET_MessageHeader *message,
1926                          const struct GNUNET_ATS_Information *atsi)
1927 {
1928   struct GNUNET_STREAM_Socket *socket = cls;
1929
1930   return handle_generic_close_ack (socket,
1931                                    tunnel,
1932                                    sender,
1933                                    (const struct GNUNET_STREAM_MessageHeader *) 
1934                                    message,
1935                                    atsi,
1936                                    SHUT_RDWR);
1937 }
1938
1939 /*****************************/
1940 /* Server's Message Handlers */
1941 /*****************************/
1942
1943 /**
1944  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1945  *
1946  * @param cls the closure
1947  * @param tunnel connection to the other end
1948  * @param tunnel_ctx the socket
1949  * @param sender who sent the message
1950  * @param message the actual message
1951  * @param atsi performance data for the connection
1952  * @return GNUNET_OK to keep the connection open,
1953  *         GNUNET_SYSERR to close it (signal serious error)
1954  */
1955 static int
1956 server_handle_data (void *cls,
1957                     struct GNUNET_MESH_Tunnel *tunnel,
1958                     void **tunnel_ctx,
1959                     const struct GNUNET_PeerIdentity *sender,
1960                     const struct GNUNET_MessageHeader *message,
1961                     const struct GNUNET_ATS_Information*atsi)
1962 {
1963   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1964
1965   return handle_data (socket,
1966                       tunnel,
1967                       sender,
1968                       (const struct GNUNET_STREAM_DataMessage *)message,
1969                       atsi);
1970 }
1971
1972
1973 /**
1974  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1975  *
1976  * @param cls the closure
1977  * @param tunnel connection to the other end
1978  * @param tunnel_ctx the socket
1979  * @param sender who sent the message
1980  * @param message the actual message
1981  * @param atsi performance data for the connection
1982  * @return GNUNET_OK to keep the connection open,
1983  *         GNUNET_SYSERR to close it (signal serious error)
1984  */
1985 static int
1986 server_handle_hello (void *cls,
1987                      struct GNUNET_MESH_Tunnel *tunnel,
1988                      void **tunnel_ctx,
1989                      const struct GNUNET_PeerIdentity *sender,
1990                      const struct GNUNET_MessageHeader *message,
1991                      const struct GNUNET_ATS_Information*atsi)
1992 {
1993   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1994   struct GNUNET_STREAM_HelloAckMessage *reply;
1995
1996   if (0 != memcmp (sender,
1997                    &socket->other_peer,
1998                    sizeof (struct GNUNET_PeerIdentity)))
1999   {
2000     LOG (GNUNET_ERROR_TYPE_DEBUG,
2001          "%s: Received HELLO from non-confirming peer\n",
2002          GNUNET_i2s (&socket->other_peer));
2003     return GNUNET_YES;
2004   }
2005
2006   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2007                  ntohs (message->type));
2008   GNUNET_assert (socket->tunnel == tunnel);
2009   LOG (GNUNET_ERROR_TYPE_DEBUG,
2010        "%s: Received HELLO from %s\n", 
2011        GNUNET_i2s (&socket->other_peer),
2012        GNUNET_i2s (&socket->other_peer));
2013
2014   if (STATE_INIT == socket->state)
2015   {
2016     reply = generate_hello_ack_msg (socket);
2017     queue_message (socket, 
2018                    &reply->header,
2019                    &set_state_hello_wait, 
2020                    NULL);
2021   }
2022   else
2023   {
2024     LOG (GNUNET_ERROR_TYPE_DEBUG,
2025          "%s: Client sent HELLO when in state %d\n", 
2026          GNUNET_i2s (&socket->other_peer),
2027          socket->state);
2028     /* FIXME: Send RESET? */
2029       
2030   }
2031   return GNUNET_OK;
2032 }
2033
2034
2035 /**
2036  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2037  *
2038  * @param cls the closure
2039  * @param tunnel connection to the other end
2040  * @param tunnel_ctx the socket
2041  * @param sender who sent the message
2042  * @param message the actual message
2043  * @param atsi performance data for the connection
2044  * @return GNUNET_OK to keep the connection open,
2045  *         GNUNET_SYSERR to close it (signal serious error)
2046  */
2047 static int
2048 server_handle_hello_ack (void *cls,
2049                          struct GNUNET_MESH_Tunnel *tunnel,
2050                          void **tunnel_ctx,
2051                          const struct GNUNET_PeerIdentity *sender,
2052                          const struct GNUNET_MessageHeader *message,
2053                          const struct GNUNET_ATS_Information*atsi)
2054 {
2055   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2056   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2057
2058   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2059                  ntohs (message->type));
2060   GNUNET_assert (socket->tunnel == tunnel);
2061   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2062   if (STATE_HELLO_WAIT == socket->state)
2063   {
2064     LOG (GNUNET_ERROR_TYPE_DEBUG,
2065          "%s: Received HELLO_ACK from %s\n",
2066          GNUNET_i2s (&socket->other_peer),
2067          GNUNET_i2s (&socket->other_peer));
2068     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2069     LOG (GNUNET_ERROR_TYPE_DEBUG,
2070          "%s: Read sequence number %u\n",
2071          GNUNET_i2s (&socket->other_peer),
2072          (unsigned int) socket->read_sequence_number);
2073     socket->receiver_window_available = 
2074       ntohl (ack_message->receiver_window_size);
2075     /* Attain ESTABLISHED state */
2076     set_state_established (NULL, socket);
2077   }
2078   else
2079   {
2080     LOG (GNUNET_ERROR_TYPE_DEBUG,
2081          "Client sent HELLO_ACK when in state %d\n", socket->state);
2082     /* FIXME: Send RESET? */
2083       
2084   }
2085   return GNUNET_OK;
2086 }
2087
2088
2089 /**
2090  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2091  *
2092  * @param cls the closure
2093  * @param tunnel connection to the other end
2094  * @param tunnel_ctx the socket
2095  * @param sender who sent the message
2096  * @param message the actual message
2097  * @param atsi performance data for the connection
2098  * @return GNUNET_OK to keep the connection open,
2099  *         GNUNET_SYSERR to close it (signal serious error)
2100  */
2101 static int
2102 server_handle_reset (void *cls,
2103                      struct GNUNET_MESH_Tunnel *tunnel,
2104                      void **tunnel_ctx,
2105                      const struct GNUNET_PeerIdentity *sender,
2106                      const struct GNUNET_MessageHeader *message,
2107                      const struct GNUNET_ATS_Information*atsi)
2108 {
2109   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2110
2111   return GNUNET_OK;
2112 }
2113
2114
2115 /**
2116  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2117  *
2118  * @param cls the closure
2119  * @param tunnel connection to the other end
2120  * @param tunnel_ctx the socket
2121  * @param sender who sent the message
2122  * @param message the actual message
2123  * @param atsi performance data for the connection
2124  * @return GNUNET_OK to keep the connection open,
2125  *         GNUNET_SYSERR to close it (signal serious error)
2126  */
2127 static int
2128 server_handle_transmit_close (void *cls,
2129                               struct GNUNET_MESH_Tunnel *tunnel,
2130                               void **tunnel_ctx,
2131                               const struct GNUNET_PeerIdentity *sender,
2132                               const struct GNUNET_MessageHeader *message,
2133                               const struct GNUNET_ATS_Information*atsi)
2134 {
2135   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2136
2137   return handle_transmit_close (socket,
2138                                 tunnel,
2139                                 sender,
2140                                 (struct GNUNET_STREAM_MessageHeader *)message,
2141                                 atsi);
2142 }
2143
2144
2145 /**
2146  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2147  *
2148  * @param cls the closure
2149  * @param tunnel connection to the other end
2150  * @param tunnel_ctx the socket
2151  * @param sender who sent the message
2152  * @param message the actual message
2153  * @param atsi performance data for the connection
2154  * @return GNUNET_OK to keep the connection open,
2155  *         GNUNET_SYSERR to close it (signal serious error)
2156  */
2157 static int
2158 server_handle_transmit_close_ack (void *cls,
2159                                   struct GNUNET_MESH_Tunnel *tunnel,
2160                                   void **tunnel_ctx,
2161                                   const struct GNUNET_PeerIdentity *sender,
2162                                   const struct GNUNET_MessageHeader *message,
2163                                   const struct GNUNET_ATS_Information*atsi)
2164 {
2165   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2166
2167   return handle_generic_close_ack (socket,
2168                                    tunnel,
2169                                    sender,
2170                                    (const struct GNUNET_STREAM_MessageHeader *)
2171                                    message,
2172                                    atsi,
2173                                    SHUT_WR);
2174 }
2175
2176
2177 /**
2178  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2179  *
2180  * @param cls the closure
2181  * @param tunnel connection to the other end
2182  * @param tunnel_ctx the socket
2183  * @param sender who sent the message
2184  * @param message the actual message
2185  * @param atsi performance data for the connection
2186  * @return GNUNET_OK to keep the connection open,
2187  *         GNUNET_SYSERR to close it (signal serious error)
2188  */
2189 static int
2190 server_handle_receive_close (void *cls,
2191                              struct GNUNET_MESH_Tunnel *tunnel,
2192                              void **tunnel_ctx,
2193                              const struct GNUNET_PeerIdentity *sender,
2194                              const struct GNUNET_MessageHeader *message,
2195                              const struct GNUNET_ATS_Information*atsi)
2196 {
2197   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2198
2199   return
2200     handle_receive_close (socket,
2201                           tunnel,
2202                           sender,
2203                           (const struct GNUNET_STREAM_MessageHeader *) message,
2204                           atsi);
2205 }
2206
2207
2208 /**
2209  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2210  *
2211  * @param cls the closure
2212  * @param tunnel connection to the other end
2213  * @param tunnel_ctx the socket
2214  * @param sender who sent the message
2215  * @param message the actual message
2216  * @param atsi performance data for the connection
2217  * @return GNUNET_OK to keep the connection open,
2218  *         GNUNET_SYSERR to close it (signal serious error)
2219  */
2220 static int
2221 server_handle_receive_close_ack (void *cls,
2222                                  struct GNUNET_MESH_Tunnel *tunnel,
2223                                  void **tunnel_ctx,
2224                                  const struct GNUNET_PeerIdentity *sender,
2225                                  const struct GNUNET_MessageHeader *message,
2226                                  const struct GNUNET_ATS_Information*atsi)
2227 {
2228   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2229
2230   return handle_generic_close_ack (socket,
2231                                    tunnel,
2232                                    sender,
2233                                    (const struct GNUNET_STREAM_MessageHeader *)
2234                                    message,
2235                                    atsi,
2236                                    SHUT_RD);
2237 }
2238
2239
2240 /**
2241  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2242  *
2243  * @param cls the listen socket (from GNUNET_MESH_connect in
2244  *          GNUNET_STREAM_listen) 
2245  * @param tunnel connection to the other end
2246  * @param tunnel_ctx the socket
2247  * @param sender who sent the message
2248  * @param message the actual message
2249  * @param atsi performance data for the connection
2250  * @return GNUNET_OK to keep the connection open,
2251  *         GNUNET_SYSERR to close it (signal serious error)
2252  */
2253 static int
2254 server_handle_close (void *cls,
2255                      struct GNUNET_MESH_Tunnel *tunnel,
2256                      void **tunnel_ctx,
2257                      const struct GNUNET_PeerIdentity *sender,
2258                      const struct GNUNET_MessageHeader *message,
2259                      const struct GNUNET_ATS_Information*atsi)
2260 {
2261   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2262   
2263   return handle_close (socket,
2264                        tunnel,
2265                        sender,
2266                        (const struct GNUNET_STREAM_MessageHeader *) message,
2267                        atsi);
2268 }
2269
2270
2271 /**
2272  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2273  *
2274  * @param cls the closure
2275  * @param tunnel connection to the other end
2276  * @param tunnel_ctx the socket
2277  * @param sender who sent the message
2278  * @param message the actual message
2279  * @param atsi performance data for the connection
2280  * @return GNUNET_OK to keep the connection open,
2281  *         GNUNET_SYSERR to close it (signal serious error)
2282  */
2283 static int
2284 server_handle_close_ack (void *cls,
2285                          struct GNUNET_MESH_Tunnel *tunnel,
2286                          void **tunnel_ctx,
2287                          const struct GNUNET_PeerIdentity *sender,
2288                          const struct GNUNET_MessageHeader *message,
2289                          const struct GNUNET_ATS_Information*atsi)
2290 {
2291   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2292
2293   return handle_generic_close_ack (socket,
2294                                    tunnel,
2295                                    sender,
2296                                    (const struct GNUNET_STREAM_MessageHeader *) 
2297                                    message,
2298                                    atsi,
2299                                    SHUT_RDWR);
2300 }
2301
2302
2303 /**
2304  * Handler for DATA_ACK messages
2305  *
2306  * @param socket the socket through which the ack was received
2307  * @param tunnel connection to the other end
2308  * @param sender who sent the message
2309  * @param ack the acknowledgment message
2310  * @param atsi performance data for the connection
2311  * @return GNUNET_OK to keep the connection open,
2312  *         GNUNET_SYSERR to close it (signal serious error)
2313  */
2314 static int
2315 handle_ack (struct GNUNET_STREAM_Socket *socket,
2316             struct GNUNET_MESH_Tunnel *tunnel,
2317             const struct GNUNET_PeerIdentity *sender,
2318             const struct GNUNET_STREAM_AckMessage *ack,
2319             const struct GNUNET_ATS_Information*atsi)
2320 {
2321   unsigned int packet;
2322   int need_retransmission;
2323   
2324
2325   if (0 != memcmp (sender,
2326                    &socket->other_peer,
2327                    sizeof (struct GNUNET_PeerIdentity)))
2328   {
2329     LOG (GNUNET_ERROR_TYPE_DEBUG,
2330          "%s: Received ACK from non-confirming peer\n",
2331          GNUNET_i2s (&socket->other_peer));
2332     return GNUNET_YES;
2333   }
2334
2335   switch (socket->state)
2336   {
2337   case (STATE_ESTABLISHED):
2338   case (STATE_RECEIVE_CLOSED):
2339   case (STATE_RECEIVE_CLOSE_WAIT):
2340     if (NULL == socket->write_handle)
2341     {
2342       LOG (GNUNET_ERROR_TYPE_DEBUG,
2343            "%s: Received DATA_ACK when write_handle is NULL\n",
2344            GNUNET_i2s (&socket->other_peer));
2345       return GNUNET_OK;
2346     }
2347     /* FIXME: increment in the base sequence number is breaking current flow
2348      */
2349     if (!((socket->write_sequence_number 
2350            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2351     {
2352       LOG (GNUNET_ERROR_TYPE_DEBUG,
2353            "%s: Received DATA_ACK with unexpected base sequence number\n",
2354            GNUNET_i2s (&socket->other_peer));
2355       LOG (GNUNET_ERROR_TYPE_DEBUG,
2356            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2357            GNUNET_i2s (&socket->other_peer),
2358            socket->write_sequence_number,
2359            ntohl (ack->base_sequence_number));
2360       return GNUNET_OK;
2361     }
2362     /* FIXME: include the case when write_handle is cancelled - ignore the 
2363        acks */
2364
2365     LOG (GNUNET_ERROR_TYPE_DEBUG,
2366          "%s: Received DATA_ACK from %s\n",
2367          GNUNET_i2s (&socket->other_peer),
2368          GNUNET_i2s (&socket->other_peer));
2369       
2370     /* Cancel the retransmission task */
2371     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2372     {
2373       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2374       socket->retransmission_timeout_task_id = 
2375         GNUNET_SCHEDULER_NO_TASK;
2376     }
2377
2378     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2379     {
2380       if (NULL == socket->write_handle->messages[packet]) break;
2381       if (ntohl (ack->base_sequence_number)
2382           >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2383         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2384                               packet,
2385                               GNUNET_YES);
2386       else
2387         if (GNUNET_YES == 
2388             ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2389                                   ntohl (socket->write_handle->messages[packet]->sequence_number)
2390                                   - ntohl (ack->base_sequence_number)))
2391           ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2392                                 packet,
2393                                 GNUNET_YES);
2394     }
2395
2396     /* Update the receive window remaining
2397        FIXME : Should update with the value from a data ack with greater
2398        sequence number */
2399     socket->receiver_window_available = 
2400       ntohl (ack->receive_window_remaining);
2401
2402     /* Check if we have received all acknowledgements */
2403     need_retransmission = GNUNET_NO;
2404     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2405     {
2406       if (NULL == socket->write_handle->messages[packet]) break;
2407       if (GNUNET_YES != ackbitmap_is_bit_set 
2408           (&socket->write_handle->ack_bitmap,packet))
2409       {
2410         need_retransmission = GNUNET_YES;
2411         break;
2412       }
2413     }
2414     if (GNUNET_YES == need_retransmission)
2415     {
2416       write_data (socket);
2417     }
2418     else      /* We have to call the write continuation callback now */
2419     {
2420       /* Free the packets */
2421       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2422       {
2423         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2424       }
2425       if (NULL != socket->write_handle->write_cont)
2426         socket->write_handle->write_cont
2427           (socket->write_handle->write_cont_cls,
2428            socket->status,
2429            socket->write_handle->size);
2430       LOG (GNUNET_ERROR_TYPE_DEBUG,
2431            "%s: Write completion callback completed\n",
2432            GNUNET_i2s (&socket->other_peer));
2433       /* We are done with the write handle - Freeing it */
2434       GNUNET_free (socket->write_handle);
2435       socket->write_handle = NULL;
2436     }
2437     break;
2438   default:
2439     break;
2440   }
2441   return GNUNET_OK;
2442 }
2443
2444
2445 /**
2446  * Handler for DATA_ACK messages
2447  *
2448  * @param cls the 'struct GNUNET_STREAM_Socket'
2449  * @param tunnel connection to the other end
2450  * @param tunnel_ctx unused
2451  * @param sender who sent the message
2452  * @param message the actual message
2453  * @param atsi performance data for the connection
2454  * @return GNUNET_OK to keep the connection open,
2455  *         GNUNET_SYSERR to close it (signal serious error)
2456  */
2457 static int
2458 client_handle_ack (void *cls,
2459                    struct GNUNET_MESH_Tunnel *tunnel,
2460                    void **tunnel_ctx,
2461                    const struct GNUNET_PeerIdentity *sender,
2462                    const struct GNUNET_MessageHeader *message,
2463                    const struct GNUNET_ATS_Information*atsi)
2464 {
2465   struct GNUNET_STREAM_Socket *socket = cls;
2466   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2467  
2468   return handle_ack (socket, tunnel, sender, ack, atsi);
2469 }
2470
2471
2472 /**
2473  * Handler for DATA_ACK messages
2474  *
2475  * @param cls the server's listen socket
2476  * @param tunnel connection to the other end
2477  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2478  * @param sender who sent the message
2479  * @param message the actual message
2480  * @param atsi performance data for the connection
2481  * @return GNUNET_OK to keep the connection open,
2482  *         GNUNET_SYSERR to close it (signal serious error)
2483  */
2484 static int
2485 server_handle_ack (void *cls,
2486                    struct GNUNET_MESH_Tunnel *tunnel,
2487                    void **tunnel_ctx,
2488                    const struct GNUNET_PeerIdentity *sender,
2489                    const struct GNUNET_MessageHeader *message,
2490                    const struct GNUNET_ATS_Information*atsi)
2491 {
2492   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2493   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2494  
2495   return handle_ack (socket, tunnel, sender, ack, atsi);
2496 }
2497
2498
2499 /**
2500  * For client message handlers, the stream socket is in the
2501  * closure argument.
2502  */
2503 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2504   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2505   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2506    sizeof (struct GNUNET_STREAM_AckMessage) },
2507   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2508    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2509   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2510    sizeof (struct GNUNET_STREAM_MessageHeader)},
2511   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2512    sizeof (struct GNUNET_STREAM_MessageHeader)},
2513   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2514    sizeof (struct GNUNET_STREAM_MessageHeader)},
2515   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2516    sizeof (struct GNUNET_STREAM_MessageHeader)},
2517   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2518    sizeof (struct GNUNET_STREAM_MessageHeader)},
2519   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2520    sizeof (struct GNUNET_STREAM_MessageHeader)},
2521   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2522    sizeof (struct GNUNET_STREAM_MessageHeader)},
2523   {NULL, 0, 0}
2524 };
2525
2526
2527 /**
2528  * For server message handlers, the stream socket is in the
2529  * tunnel context, and the listen socket in the closure argument.
2530  */
2531 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2532   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2533   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2534    sizeof (struct GNUNET_STREAM_AckMessage) },
2535   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2536    sizeof (struct GNUNET_STREAM_MessageHeader)},
2537   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2538    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2539   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2540    sizeof (struct GNUNET_STREAM_MessageHeader)},
2541   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2542    sizeof (struct GNUNET_STREAM_MessageHeader)},
2543   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2544    sizeof (struct GNUNET_STREAM_MessageHeader)},
2545   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2546    sizeof (struct GNUNET_STREAM_MessageHeader)},
2547   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2548    sizeof (struct GNUNET_STREAM_MessageHeader)},
2549   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2550    sizeof (struct GNUNET_STREAM_MessageHeader)},
2551   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2552    sizeof (struct GNUNET_STREAM_MessageHeader)},
2553   {NULL, 0, 0}
2554 };
2555
2556
2557 /**
2558  * Function called when our target peer is connected to our tunnel
2559  *
2560  * @param cls the socket for which this tunnel is created
2561  * @param peer the peer identity of the target
2562  * @param atsi performance data for the connection
2563  */
2564 static void
2565 mesh_peer_connect_callback (void *cls,
2566                             const struct GNUNET_PeerIdentity *peer,
2567                             const struct GNUNET_ATS_Information * atsi)
2568 {
2569   struct GNUNET_STREAM_Socket *socket = cls;
2570   struct GNUNET_STREAM_MessageHeader *message;
2571   
2572   if (0 != memcmp (peer,
2573                    &socket->other_peer,
2574                    sizeof (struct GNUNET_PeerIdentity)))
2575   {
2576     LOG (GNUNET_ERROR_TYPE_DEBUG,
2577          "%s: A peer which is not our target has connected to our tunnel\n",
2578          GNUNET_i2s(peer));
2579     return;
2580   }
2581   
2582   LOG (GNUNET_ERROR_TYPE_DEBUG,
2583        "%s: Target peer %s connected\n",
2584        GNUNET_i2s (&socket->other_peer),
2585        GNUNET_i2s (&socket->other_peer));
2586   
2587   /* Set state to INIT */
2588   socket->state = STATE_INIT;
2589
2590   /* Send HELLO message */
2591   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2592   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2593   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2594   queue_message (socket,
2595                  message,
2596                  &set_state_hello_wait,
2597                  NULL);
2598
2599   /* Call open callback */
2600   if (NULL == socket->open_cb)
2601   {
2602     LOG (GNUNET_ERROR_TYPE_DEBUG,
2603          "STREAM_open callback is NULL\n");
2604   }
2605 }
2606
2607
2608 /**
2609  * Function called when our target peer is disconnected from our tunnel
2610  *
2611  * @param cls the socket associated which this tunnel
2612  * @param peer the peer identity of the target
2613  */
2614 static void
2615 mesh_peer_disconnect_callback (void *cls,
2616                                const struct GNUNET_PeerIdentity *peer)
2617 {
2618   struct GNUNET_STREAM_Socket *socket=cls;
2619   
2620   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2621   LOG (GNUNET_ERROR_TYPE_DEBUG,
2622        "%s: Other peer %s disconnected \n",
2623        GNUNET_i2s (&socket->other_peer),
2624        GNUNET_i2s (&socket->other_peer));
2625 }
2626
2627
2628 /**
2629  * Method called whenever a peer creates a tunnel to us
2630  *
2631  * @param cls closure
2632  * @param tunnel new handle to the tunnel
2633  * @param initiator peer that started the tunnel
2634  * @param atsi performance information for the tunnel
2635  * @return initial tunnel context for the tunnel
2636  *         (can be NULL -- that's not an error)
2637  */
2638 static void *
2639 new_tunnel_notify (void *cls,
2640                    struct GNUNET_MESH_Tunnel *tunnel,
2641                    const struct GNUNET_PeerIdentity *initiator,
2642                    const struct GNUNET_ATS_Information *atsi)
2643 {
2644   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2645   struct GNUNET_STREAM_Socket *socket;
2646
2647   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2648      from the same peer again until the socket is closed */
2649
2650   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2651   socket->other_peer = *initiator;
2652   socket->tunnel = tunnel;
2653   socket->session_id = 0;       /* FIXME */
2654   socket->state = STATE_INIT;
2655   socket->lsocket = lsocket;
2656   
2657   LOG (GNUNET_ERROR_TYPE_DEBUG,
2658        "%s: Peer %s initiated tunnel to us\n", 
2659        GNUNET_i2s (&socket->other_peer),
2660        GNUNET_i2s (&socket->other_peer));
2661   
2662   /* FIXME: Copy MESH handle from lsocket to socket */
2663   
2664   return socket;
2665 }
2666
2667
2668 /**
2669  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2670  * any associated state.  This function is NOT called if the client has
2671  * explicitly asked for the tunnel to be destroyed using
2672  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2673  * the tunnel.
2674  *
2675  * @param cls closure (set from GNUNET_MESH_connect)
2676  * @param tunnel connection to the other end (henceforth invalid)
2677  * @param tunnel_ctx place where local state associated
2678  *                   with the tunnel is stored
2679  */
2680 static void 
2681 tunnel_cleaner (void *cls,
2682                 const struct GNUNET_MESH_Tunnel *tunnel,
2683                 void *tunnel_ctx)
2684 {
2685   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2686
2687   if (tunnel != socket->tunnel)
2688     return;
2689
2690   GNUNET_break_op(0);
2691   LOG (GNUNET_ERROR_TYPE_DEBUG,
2692        "%s: Peer %s has terminated connection abruptly\n",
2693        GNUNET_i2s (&socket->other_peer),
2694        GNUNET_i2s (&socket->other_peer));
2695
2696   socket->status = GNUNET_STREAM_SHUTDOWN;
2697
2698   /* Clear Transmit handles */
2699   if (NULL != socket->transmit_handle)
2700   {
2701     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2702     socket->transmit_handle = NULL;
2703   }
2704   if (NULL != socket->ack_transmit_handle)
2705   {
2706     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2707     GNUNET_free (socket->ack_msg);
2708     socket->ack_msg = NULL;
2709     socket->ack_transmit_handle = NULL;
2710   }
2711   /* Stop Tasks using socket->tunnel */
2712   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2713   {
2714     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2715     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2716   }
2717   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2718   {
2719     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2720     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2721   }
2722   /* FIXME: Cancel all other tasks using socket->tunnel */
2723   socket->tunnel = NULL;
2724 }
2725
2726
2727 /*****************/
2728 /* API functions */
2729 /*****************/
2730
2731
2732 /**
2733  * Tries to open a stream to the target peer
2734  *
2735  * @param cfg configuration to use
2736  * @param target the target peer to which the stream has to be opened
2737  * @param app_port the application port number which uniquely identifies this
2738  *            stream
2739  * @param open_cb this function will be called after stream has be established 
2740  * @param open_cb_cls the closure for open_cb
2741  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2742  * @return if successful it returns the stream socket; NULL if stream cannot be
2743  *         opened 
2744  */
2745 struct GNUNET_STREAM_Socket *
2746 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2747                     const struct GNUNET_PeerIdentity *target,
2748                     GNUNET_MESH_ApplicationType app_port,
2749                     GNUNET_STREAM_OpenCallback open_cb,
2750                     void *open_cb_cls,
2751                     ...)
2752 {
2753   struct GNUNET_STREAM_Socket *socket;
2754   enum GNUNET_STREAM_Option option;
2755   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2756   va_list vargs;                /* Variable arguments */
2757
2758   LOG (GNUNET_ERROR_TYPE_DEBUG,
2759        "%s\n", __func__);
2760   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2761   socket->other_peer = *target;
2762   socket->open_cb = open_cb;
2763   socket->open_cls = open_cb_cls;
2764   /* Set defaults */
2765   socket->retransmit_timeout = 
2766     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2767   va_start (vargs, open_cb_cls); /* Parse variable args */
2768   do {
2769     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2770     switch (option)
2771     {
2772     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2773       /* Expect struct GNUNET_TIME_Relative */
2774       socket->retransmit_timeout = va_arg (vargs,
2775                                            struct GNUNET_TIME_Relative);
2776       break;
2777     case GNUNET_STREAM_OPTION_END:
2778       break;
2779     }
2780   } while (GNUNET_STREAM_OPTION_END != option);
2781   va_end (vargs);               /* End of variable args parsing */
2782   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2783                                       10,  /* QUEUE size as parameter? */
2784                                       socket, /* cls */
2785                                       NULL, /* No inbound tunnel handler */
2786                                       NULL, /* No in-tunnel cleaner */
2787                                       client_message_handlers,
2788                                       ports); /* We don't get inbound tunnels */
2789   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2790   {
2791     GNUNET_free (socket);
2792     return NULL;
2793   }
2794
2795   /* Now create the mesh tunnel to target */
2796   LOG (GNUNET_ERROR_TYPE_DEBUG,
2797        "Creating MESH Tunnel\n");
2798   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2799                                               NULL, /* Tunnel context */
2800                                               &mesh_peer_connect_callback,
2801                                               &mesh_peer_disconnect_callback,
2802                                               socket);
2803   GNUNET_assert (NULL != socket->tunnel);
2804   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2805                                         &socket->other_peer);
2806   
2807   LOG (GNUNET_ERROR_TYPE_DEBUG,
2808        "%s() END\n", __func__);
2809   return socket;
2810 }
2811
2812
2813 /**
2814  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2815  *
2816  * @param socket the stream socket
2817  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2818  * @param completion_cb the callback that will be called upon successful
2819  *          shutdown of given operation
2820  * @param completion_cls the closure for the completion callback
2821  * @return the shutdown handle
2822  */
2823 struct GNUNET_STREAM_ShutdownHandle *
2824 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2825                         int operation,
2826                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2827                         void *completion_cls)
2828 {
2829   struct GNUNET_STREAM_ShutdownHandle *handle;
2830   struct GNUNET_STREAM_MessageHeader *msg;
2831   
2832   GNUNET_assert (NULL == socket->shutdown_handle);
2833
2834   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2835   handle->socket = socket;
2836   handle->completion_cb = completion_cb;
2837   handle->completion_cls = completion_cls;
2838   socket->shutdown_handle = handle;
2839
2840   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2841   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2842   switch (operation)
2843   {
2844   case SHUT_RD:
2845     handle->operation = SHUT_RD;
2846     if (NULL != socket->read_handle)
2847       LOG (GNUNET_ERROR_TYPE_WARNING,
2848            "Existing read handle should be cancelled before shutting"
2849            " down reading\n");
2850     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2851     queue_message (socket,
2852                    msg,
2853                    &set_state_receive_close_wait,
2854                    NULL);
2855     break;
2856   case SHUT_WR:
2857     handle->operation = SHUT_WR;
2858     if (NULL != socket->write_handle)
2859       LOG (GNUNET_ERROR_TYPE_WARNING,
2860            "Existing write handle should be cancelled before shutting"
2861            " down writing\n");
2862     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2863     queue_message (socket,
2864                    msg,
2865                    &set_state_transmit_close_wait,
2866                    NULL);
2867     break;
2868   case SHUT_RDWR:
2869     handle->operation = SHUT_RDWR;
2870     if (NULL != socket->write_handle)
2871       LOG (GNUNET_ERROR_TYPE_WARNING,
2872            "Existing write handle should be cancelled before shutting"
2873            " down writing\n");
2874     if (NULL != socket->read_handle)
2875       LOG (GNUNET_ERROR_TYPE_WARNING,
2876            "Existing read handle should be cancelled before shutting"
2877            " down reading\n");
2878     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2879     queue_message (socket,
2880                    msg,
2881                    &set_state_close_wait,
2882                    NULL);
2883     break;
2884   default:
2885     LOG (GNUNET_ERROR_TYPE_WARNING,
2886          "GNUNET_STREAM_shutdown called with invalid value for "
2887          "parameter operation -- Ignoring\n");
2888     GNUNET_free (msg);
2889     GNUNET_free (handle);
2890     return NULL;
2891   }
2892   handle->close_msg_retransmission_task_id =
2893     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2894                                   &close_msg_retransmission_task,
2895                                   handle);
2896   return handle;
2897 }
2898
2899
2900 /**
2901  * Cancels a pending shutdown
2902  *
2903  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2904  */
2905 void
2906 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2907 {
2908   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2909     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2910   GNUNET_free (handle);
2911   return;
2912 }
2913
2914
2915 /**
2916  * Closes the stream
2917  *
2918  * @param socket the stream socket
2919  */
2920 void
2921 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2922 {
2923   struct MessageQueue *head;
2924
2925   if (NULL != socket->read_handle)
2926   {
2927     LOG (GNUNET_ERROR_TYPE_WARNING,
2928          "Closing STREAM socket when a read handle is pending\n");
2929   }
2930   if (NULL != socket->write_handle)
2931   {
2932     LOG (GNUNET_ERROR_TYPE_WARNING,
2933          "Closing STREAM socket when a write handle is pending\n");
2934     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2935     //socket->write_handle = NULL;
2936   }
2937
2938   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2939   {
2940     /* socket closed with read task pending!? */
2941     GNUNET_break (0);
2942     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2943     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2944   }
2945   
2946   /* Terminate the ack'ing tasks if they are still present */
2947   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2948   {
2949     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2950     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2951   }
2952
2953   /* Clear Transmit handles */
2954   if (NULL != socket->transmit_handle)
2955   {
2956     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2957     socket->transmit_handle = NULL;
2958   }
2959   if (NULL != socket->ack_transmit_handle)
2960   {
2961     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2962     GNUNET_free (socket->ack_msg);
2963     socket->ack_msg = NULL;
2964     socket->ack_transmit_handle = NULL;
2965   }
2966
2967   /* Clear existing message queue */
2968   while (NULL != (head = socket->queue_head)) {
2969     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2970                                  socket->queue_tail,
2971                                  head);
2972     GNUNET_free (head->message);
2973     GNUNET_free (head);
2974   }
2975
2976   /* Close associated tunnel */
2977   if (NULL != socket->tunnel)
2978   {
2979     GNUNET_MESH_tunnel_destroy (socket->tunnel);
2980     socket->tunnel = NULL;
2981   }
2982
2983   /* Close mesh connection */
2984   if (NULL != socket->mesh && NULL == socket->lsocket)
2985   {
2986     GNUNET_MESH_disconnect (socket->mesh);
2987     socket->mesh = NULL;
2988   }
2989   
2990   /* Release receive buffer */
2991   if (NULL != socket->receive_buffer)
2992   {
2993     GNUNET_free (socket->receive_buffer);
2994   }
2995
2996   GNUNET_free (socket);
2997 }
2998
2999
3000 /**
3001  * Listens for stream connections for a specific application ports
3002  *
3003  * @param cfg the configuration to use
3004  * @param app_port the application port for which new streams will be accepted
3005  * @param listen_cb this function will be called when a peer tries to establish
3006  *            a stream with us
3007  * @param listen_cb_cls closure for listen_cb
3008  * @return listen socket, NULL for any error
3009  */
3010 struct GNUNET_STREAM_ListenSocket *
3011 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3012                       GNUNET_MESH_ApplicationType app_port,
3013                       GNUNET_STREAM_ListenCallback listen_cb,
3014                       void *listen_cb_cls)
3015 {
3016   /* FIXME: Add variable args for passing configration options? */
3017   struct GNUNET_STREAM_ListenSocket *lsocket;
3018   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3019
3020   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3021   lsocket->port = app_port;
3022   lsocket->listen_cb = listen_cb;
3023   lsocket->listen_cb_cls = listen_cb_cls;
3024   lsocket->mesh = GNUNET_MESH_connect (cfg,
3025                                        10, /* FIXME: QUEUE size as parameter? */
3026                                        lsocket, /* Closure */
3027                                        &new_tunnel_notify,
3028                                        &tunnel_cleaner,
3029                                        server_message_handlers,
3030                                        ports);
3031   GNUNET_assert (NULL != lsocket->mesh);
3032   return lsocket;
3033 }
3034
3035
3036 /**
3037  * Closes the listen socket
3038  *
3039  * @param lsocket the listen socket
3040  */
3041 void
3042 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3043 {
3044   /* Close MESH connection */
3045   GNUNET_assert (NULL != lsocket->mesh);
3046   GNUNET_MESH_disconnect (lsocket->mesh);
3047   
3048   GNUNET_free (lsocket);
3049 }
3050
3051
3052 /**
3053  * Tries to write the given data to the stream. The maximum size of data that
3054  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3055  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3056  * violation, however only the said number of maximum bytes will be written.
3057  *
3058  * @param socket the socket representing a stream
3059  * @param data the data buffer from where the data is written into the stream
3060  * @param size the number of bytes to be written from the data buffer
3061  * @param timeout the timeout period
3062  * @param write_cont the function to call upon writing some bytes into the
3063  *          stream 
3064  * @param write_cont_cls the closure
3065  *
3066  * @return handle to cancel the operation; if a previous write is pending or
3067  *           the stream has been shutdown for this operation then write_cont is
3068  *           immediately called and NULL is returned.
3069  */
3070 struct GNUNET_STREAM_IOWriteHandle *
3071 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3072                      const void *data,
3073                      size_t size,
3074                      struct GNUNET_TIME_Relative timeout,
3075                      GNUNET_STREAM_CompletionContinuation write_cont,
3076                      void *write_cont_cls)
3077 {
3078   unsigned int num_needed_packets;
3079   unsigned int packet;
3080   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3081   uint32_t packet_size;
3082   uint32_t payload_size;
3083   struct GNUNET_STREAM_DataMessage *data_msg;
3084   const void *sweep;
3085   struct GNUNET_TIME_Relative ack_deadline;
3086
3087   LOG (GNUNET_ERROR_TYPE_DEBUG,
3088        "%s\n", __func__);
3089
3090   /* Return NULL if there is already a write request pending */
3091   if (NULL != socket->write_handle)
3092   {
3093     GNUNET_break (0);
3094     return NULL;
3095   }
3096
3097   switch (socket->state)
3098   {
3099   case STATE_TRANSMIT_CLOSED:
3100   case STATE_TRANSMIT_CLOSE_WAIT:
3101   case STATE_CLOSED:
3102   case STATE_CLOSE_WAIT:
3103     if (NULL != write_cont)
3104       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3105     LOG (GNUNET_ERROR_TYPE_DEBUG,
3106          "%s() END\n", __func__);
3107     return NULL;
3108   case STATE_INIT:
3109   case STATE_LISTEN:
3110   case STATE_HELLO_WAIT:
3111     if (NULL != write_cont)
3112       /* FIXME: GNUNET_STREAM_SYSERR?? */
3113       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3114     LOG (GNUNET_ERROR_TYPE_DEBUG,
3115          "%s() END\n", __func__);
3116     return NULL;
3117   case STATE_ESTABLISHED:
3118   case STATE_RECEIVE_CLOSED:
3119   case STATE_RECEIVE_CLOSE_WAIT:
3120     break;
3121   }
3122
3123   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3124     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3125   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3126   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3127   io_handle->socket = socket;
3128   io_handle->write_cont = write_cont;
3129   io_handle->write_cont_cls = write_cont_cls;
3130   io_handle->size = size;
3131   sweep = data;
3132   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3133      determined from RTT */
3134   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3135   /* Divide the given buffer into packets for sending */
3136   for (packet=0; packet < num_needed_packets; packet++)
3137   {
3138     if ((packet + 1) * max_payload_size < size) 
3139     {
3140       payload_size = max_payload_size;
3141       packet_size = MAX_PACKET_SIZE;
3142     }
3143     else 
3144     {
3145       payload_size = size - packet * max_payload_size;
3146       packet_size =  payload_size + sizeof (struct
3147                                             GNUNET_STREAM_DataMessage); 
3148     }
3149     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3150     io_handle->messages[packet]->header.header.size = htons (packet_size);
3151     io_handle->messages[packet]->header.header.type =
3152       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3153     io_handle->messages[packet]->sequence_number =
3154       htonl (socket->write_sequence_number++);
3155     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3156
3157     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3158        determined from RTT */
3159     io_handle->messages[packet]->ack_deadline =
3160       GNUNET_TIME_relative_hton (ack_deadline);
3161     data_msg = io_handle->messages[packet];
3162     /* Copy data from given buffer to the packet */
3163     memcpy (&data_msg[1],
3164             sweep,
3165             payload_size);
3166     sweep += payload_size;
3167     socket->write_offset += payload_size;
3168   }
3169   socket->write_handle = io_handle;
3170   write_data (socket);
3171
3172   LOG (GNUNET_ERROR_TYPE_DEBUG,
3173        "%s() END\n", __func__);
3174
3175   return io_handle;
3176 }
3177
3178
3179
3180 /**
3181  * Tries to read data from the stream.
3182  *
3183  * @param socket the socket representing a stream
3184  * @param timeout the timeout period
3185  * @param proc function to call with data (once only)
3186  * @param proc_cls the closure for proc
3187  *
3188  * @return handle to cancel the operation; if the stream has been shutdown for
3189  *           this type of opeartion then the DataProcessor is immediately
3190  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3191  */
3192 struct GNUNET_STREAM_IOReadHandle *
3193 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3194                     struct GNUNET_TIME_Relative timeout,
3195                     GNUNET_STREAM_DataProcessor proc,
3196                     void *proc_cls)
3197 {
3198   struct GNUNET_STREAM_IOReadHandle *read_handle;
3199   
3200   LOG (GNUNET_ERROR_TYPE_DEBUG,
3201        "%s: %s()\n", 
3202        GNUNET_i2s (&socket->other_peer),
3203        __func__);
3204
3205   /* Return NULL if there is already a read handle; the user has to cancel that
3206      first before continuing or has to wait until it is completed */
3207   if (NULL != socket->read_handle) return NULL;
3208
3209   GNUNET_assert (NULL != proc);
3210
3211   switch (socket->state)
3212   {
3213   case STATE_RECEIVE_CLOSED:
3214   case STATE_RECEIVE_CLOSE_WAIT:
3215   case STATE_CLOSED:
3216   case STATE_CLOSE_WAIT:
3217     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3218     LOG (GNUNET_ERROR_TYPE_DEBUG,
3219          "%s: %s() END\n",
3220          GNUNET_i2s (&socket->other_peer),
3221          __func__);
3222     return NULL;
3223   default:
3224     break;
3225   }
3226
3227   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3228   read_handle->proc = proc;
3229   read_handle->proc_cls = proc_cls;
3230   socket->read_handle = read_handle;
3231
3232   /* Check if we have a packet at bitmap 0 */
3233   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3234                                           0))
3235   {
3236     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3237                                                      socket);
3238    
3239   }
3240   
3241   /* Setup the read timeout task */
3242   socket->read_io_timeout_task_id =
3243     GNUNET_SCHEDULER_add_delayed (timeout,
3244                                   &read_io_timeout,
3245                                   socket);
3246   LOG (GNUNET_ERROR_TYPE_DEBUG,
3247        "%s: %s() END\n",
3248        GNUNET_i2s (&socket->other_peer),
3249        __func__);
3250   return read_handle;
3251 }
3252
3253
3254 /**
3255  * Cancel pending write operation.
3256  *
3257  * @param ioh handle to operation to cancel
3258  */
3259 void
3260 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3261 {
3262   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3263   unsigned int packet;
3264
3265   GNUNET_assert (NULL != socket->write_handle);
3266   GNUNET_assert (socket->write_handle == ioh);
3267
3268   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3269   {
3270     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3271     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3272   }
3273
3274   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3275   {
3276     if (NULL == ioh->messages[packet]) break;
3277     GNUNET_free (ioh->messages[packet]);
3278   }
3279       
3280   GNUNET_free (socket->write_handle);
3281   socket->write_handle = NULL;
3282   return;
3283 }
3284
3285
3286 /**
3287  * Cancel pending read operation.
3288  *
3289  * @param ioh handle to operation to cancel
3290  */
3291 void
3292 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3293 {
3294   return;
3295 }