remvod verbose debugging in stream api; fixed warning in stream_big test case
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_stream_lib.h"
42 #include "stream_protocol.h"
43
44 #define LOG(kind,...)                                   \
45   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
46
47 /**
48  * The maximum packet size of a stream packet
49  */
50 #define MAX_PACKET_SIZE 512//64000
51
52 /**
53  * Receive buffer
54  */
55 #define RECEIVE_BUFFER_SIZE 4096000
56
57 /**
58  * The maximum payload a data message packet can carry
59  */
60 static const size_t max_payload_size = 
61   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
62
63 /**
64  * states in the Protocol
65  */
66 enum State
67   {
68     /**
69      * Client initialization state
70      */
71     STATE_INIT,
72
73     /**
74      * Listener initialization state 
75      */
76     STATE_LISTEN,
77
78     /**
79      * Pre-connection establishment state
80      */
81     STATE_HELLO_WAIT,
82
83     /**
84      * State where a connection has been established
85      */
86     STATE_ESTABLISHED,
87
88     /**
89      * State where the socket is closed on our side and waiting to be ACK'ed
90      */
91     STATE_RECEIVE_CLOSE_WAIT,
92
93     /**
94      * State where the socket is closed for reading
95      */
96     STATE_RECEIVE_CLOSED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_TRANSMIT_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for writing
105      */
106     STATE_TRANSMIT_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed
115      */
116     STATE_CLOSED 
117   };
118
119
120 /**
121  * Functions of this type are called when a message is written
122  *
123  * @param cls the closure from queue_message
124  * @param socket the socket the written message was bound to
125  */
126 typedef void (*SendFinishCallback) (void *cls,
127                                     struct GNUNET_STREAM_Socket *socket);
128
129
130 /**
131  * The send message queue
132  */
133 struct MessageQueue
134 {
135   /**
136    * The message
137    */
138   struct GNUNET_STREAM_MessageHeader *message;
139
140   /**
141    * Callback to be called when the message is sent
142    */
143   SendFinishCallback finish_cb;
144
145   /**
146    * The closure for finish_cb
147    */
148   void *finish_cb_cls;
149
150   /**
151    * The next message in queue. Should be NULL in the last message
152    */
153   struct MessageQueue *next;
154
155   /**
156    * The next message in queue. Should be NULL in the first message
157    */
158   struct MessageQueue *prev;
159 };
160
161
162 /**
163  * The STREAM Socket Handler
164  */
165 struct GNUNET_STREAM_Socket
166 {
167   /**
168    * Retransmission timeout
169    */
170   struct GNUNET_TIME_Relative retransmit_timeout;
171
172   /**
173    * The Acknowledgement Bitmap
174    */
175   GNUNET_STREAM_AckBitmap ack_bitmap;
176
177   /**
178    * Time when the Acknowledgement was queued
179    */
180   struct GNUNET_TIME_Absolute ack_time_registered;
181
182   /**
183    * Queued Acknowledgement deadline
184    */
185   struct GNUNET_TIME_Relative ack_time_deadline;
186
187   /**
188    * The mesh handle
189    */
190   struct GNUNET_MESH_Handle *mesh;
191
192   /**
193    * The mesh tunnel handle
194    */
195   struct GNUNET_MESH_Tunnel *tunnel;
196
197   /**
198    * Stream open closure
199    */
200   void *open_cls;
201
202   /**
203    * Stream open callback
204    */
205   GNUNET_STREAM_OpenCallback open_cb;
206
207   /**
208    * The current transmit handle (if a pending transmit request exists)
209    */
210   struct GNUNET_MESH_TransmitHandle *transmit_handle;
211
212   /**
213    * The current act transmit handle (if a pending ack transmit request exists)
214    */
215   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
216
217   /**
218    * Pointer to the current ack message using in ack_task
219    */
220   struct GNUNET_STREAM_AckMessage *ack_msg;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for the read io timeout task
265    */
266   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
267
268   /**
269    * Task identifier for retransmission task after timeout
270    */
271   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * Task scheduled to continue a read operation.
280    */
281   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
282
283   /**
284    * The state of the protocol associated with this socket
285    */
286   enum State state;
287
288   /**
289    * The status of the socket
290    */
291   enum GNUNET_STREAM_Status status;
292
293   /**
294    * The number of previous timeouts; FIXME: currently not used
295    */
296   unsigned int retries;
297
298   /**
299    * The application port number (type: uint32_t)
300    */
301   GNUNET_MESH_ApplicationType app_port;
302
303   /**
304    * The session id associated with this stream connection
305    * FIXME: Not used currently, may be removed
306    */
307   uint32_t session_id;
308
309   /**
310    * Write sequence number. Set to random when sending HELLO(client) and
311    * HELLO_ACK(server) 
312    */
313   uint32_t write_sequence_number;
314
315   /**
316    * Read sequence number. This number's value is determined during handshake
317    */
318   uint32_t read_sequence_number;
319
320   /**
321    * The receiver buffer size
322    */
323   uint32_t receive_buffer_size;
324
325   /**
326    * The receiver buffer boundaries
327    */
328   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
329
330   /**
331    * receiver's available buffer after the last acknowledged packet
332    */
333   uint32_t receiver_window_available;
334
335   /**
336    * The offset pointer used during write operation
337    */
338   uint32_t write_offset;
339
340   /**
341    * The offset after which we are expecting data
342    */
343   uint32_t read_offset;
344
345   /**
346    * The offset upto which user has read from the received buffer
347    */
348   uint32_t copy_offset;
349 };
350
351
352 /**
353  * A socket for listening
354  */
355 struct GNUNET_STREAM_ListenSocket
356 {
357   /**
358    * The mesh handle
359    */
360   struct GNUNET_MESH_Handle *mesh;
361
362   /**
363    * The callback function which is called after successful opening socket
364    */
365   GNUNET_STREAM_ListenCallback listen_cb;
366
367   /**
368    * The call back closure
369    */
370   void *listen_cb_cls;
371
372   /**
373    * The service port
374    * FIXME: Remove if not required!
375    */
376   GNUNET_MESH_ApplicationType port;
377 };
378
379
380 /**
381  * The IO Write Handle
382  */
383 struct GNUNET_STREAM_IOWriteHandle
384 {
385   /**
386    * The socket to which this write handle is associated
387    */
388   struct GNUNET_STREAM_Socket *socket;
389
390   /**
391    * The packet_buffers associated with this Handle
392    */
393   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
394
395   /**
396    * The write continuation callback
397    */
398   GNUNET_STREAM_CompletionContinuation write_cont;
399
400   /**
401    * Write continuation closure
402    */
403   void *write_cont_cls;
404
405   /**
406    * The bitmap of this IOHandle; Corresponding bit for a message is set when
407    * it has been acknowledged by the receiver
408    */
409   GNUNET_STREAM_AckBitmap ack_bitmap;
410
411   /**
412    * Number of bytes in this write handle
413    */
414   size_t size;
415 };
416
417
418 /**
419  * The IO Read Handle
420  */
421 struct GNUNET_STREAM_IOReadHandle
422 {
423   /**
424    * Callback for the read processor
425    */
426   GNUNET_STREAM_DataProcessor proc;
427
428   /**
429    * The closure pointer for the read processor callback
430    */
431   void *proc_cls;
432 };
433
434
435 /**
436  * Handle for Shutdown
437  */
438 struct GNUNET_STREAM_ShutdownHandle
439 {
440   /**
441    * The socket associated with this shutdown handle
442    */
443   struct GNUNET_STREAM_Socket *socket;
444
445   /**
446    * Shutdown completion callback
447    */
448   GNUNET_STREAM_ShutdownCompletion completion_cb;
449
450   /**
451    * Closure for completion callback
452    */
453   void *completion_cls;
454
455   /**
456    * Close message retransmission task id
457    */
458   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
459
460   /**
461    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
462    */
463   int operation;  
464 };
465
466
467 /**
468  * Default value in seconds for various timeouts
469  */
470 static unsigned int default_timeout = 10;
471
472
473 /**
474  * Callback function for sending queued message
475  *
476  * @param cls closure the socket
477  * @param size number of bytes available in buf
478  * @param buf where the callee should write the message
479  * @return number of bytes written to buf
480  */
481 static size_t
482 send_message_notify (void *cls, size_t size, void *buf)
483 {
484   struct GNUNET_STREAM_Socket *socket = cls;
485   struct MessageQueue *head;
486   size_t ret;
487
488   socket->transmit_handle = NULL; /* Remove the transmit handle */
489   head = socket->queue_head;
490   if (NULL == head)
491     return 0; /* just to be safe */
492   if (0 == size)                /* request timed out */
493   {
494     socket->retries++;
495     LOG (GNUNET_ERROR_TYPE_DEBUG,
496          "%s: Message sending timed out. Retry %d \n",
497          GNUNET_i2s (&socket->other_peer),
498          socket->retries);
499     socket->transmit_handle = 
500       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
501                                          0, /* Corking */
502                                          1, /* Priority */
503                                          /* FIXME: exponential backoff */
504                                          socket->retransmit_timeout,
505                                          &socket->other_peer,
506                                          ntohs (head->message->header.size),
507                                          &send_message_notify,
508                                          socket);
509     return 0;
510   }
511
512   ret = ntohs (head->message->header.size);
513   GNUNET_assert (size >= ret);
514   memcpy (buf, head->message, ret);
515   if (NULL != head->finish_cb)
516   {
517     head->finish_cb (head->finish_cb_cls, socket);
518   }
519   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
520                                socket->queue_tail,
521                                head);
522   GNUNET_free (head->message);
523   GNUNET_free (head);
524   head = socket->queue_head;
525   if (NULL != head)    /* more pending messages to send */
526   {
527     socket->retries = 0;
528     socket->transmit_handle = 
529       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
530                                          0, /* Corking */
531                                          1, /* Priority */
532                                          /* FIXME: exponential backoff */
533                                          socket->retransmit_timeout,
534                                          &socket->other_peer,
535                                          ntohs (head->message->header.size),
536                                          &send_message_notify,
537                                          socket);
538   }
539   return ret;
540 }
541
542
543 /**
544  * Queues a message for sending using the mesh connection of a socket
545  *
546  * @param socket the socket whose mesh connection is used
547  * @param message the message to be sent
548  * @param finish_cb the callback to be called when the message is sent
549  * @param finish_cb_cls the closure for the callback
550  */
551 static void
552 queue_message (struct GNUNET_STREAM_Socket *socket,
553                struct GNUNET_STREAM_MessageHeader *message,
554                SendFinishCallback finish_cb,
555                void *finish_cb_cls)
556 {
557   struct MessageQueue *queue_entity;
558
559   GNUNET_assert 
560     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
561      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
562
563   LOG (GNUNET_ERROR_TYPE_DEBUG,
564        "%s: Queueing message of type %d and size %d\n",
565        GNUNET_i2s (&socket->other_peer),
566        ntohs (message->header.type),
567        ntohs (message->header.size));
568   GNUNET_assert (NULL != message);
569   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
570   queue_entity->message = message;
571   queue_entity->finish_cb = finish_cb;
572   queue_entity->finish_cb_cls = finish_cb_cls;
573   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
574                                     socket->queue_tail,
575                                     queue_entity);
576   if (NULL == socket->transmit_handle)
577   {
578     socket->retries = 0;
579     socket->transmit_handle = 
580       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
581                                          0, /* Corking */
582                                          1, /* Priority */
583                                          socket->retransmit_timeout,
584                                          &socket->other_peer,
585                                          ntohs (message->header.size),
586                                          &send_message_notify,
587                                          socket);
588   }
589 }
590
591
592 /**
593  * Copies a message and queues it for sending using the mesh connection of
594  * given socket 
595  *
596  * @param socket the socket whose mesh connection is used
597  * @param message the message to be sent
598  * @param finish_cb the callback to be called when the message is sent
599  * @param finish_cb_cls the closure for the callback
600  */
601 static void
602 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
603                         const struct GNUNET_STREAM_MessageHeader *message,
604                         SendFinishCallback finish_cb,
605                         void *finish_cb_cls)
606 {
607   struct GNUNET_STREAM_MessageHeader *msg_copy;
608   uint16_t size;
609   
610   size = ntohs (message->header.size);
611   msg_copy = GNUNET_malloc (size);
612   memcpy (msg_copy, message, size);
613   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
614 }
615
616
617 /**
618  * Callback function for sending ack message
619  *
620  * @param cls closure the ACK message created in ack_task
621  * @param size number of bytes available in buffer
622  * @param buf where the callee should write the message
623  * @return number of bytes written to buf
624  */
625 static size_t
626 send_ack_notify (void *cls, size_t size, void *buf)
627 {
628   struct GNUNET_STREAM_Socket *socket = cls;
629
630   if (0 == size)
631   {
632     LOG (GNUNET_ERROR_TYPE_DEBUG,
633          "%s called with size 0\n", __func__);
634     return 0;
635   }
636   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
637   
638   size = ntohs (socket->ack_msg->header.header.size);
639   memcpy (buf, socket->ack_msg, size);
640   
641   GNUNET_free (socket->ack_msg);
642   socket->ack_msg = NULL;
643   socket->ack_transmit_handle = NULL;
644   return size;
645 }
646
647 /**
648  * Writes data using the given socket. The amount of data written is limited by
649  * the receiver_window_size
650  *
651  * @param socket the socket to use
652  */
653 static void 
654 write_data (struct GNUNET_STREAM_Socket *socket);
655
656 /**
657  * Task for retransmitting data messages if they aren't ACK before their ack
658  * deadline 
659  *
660  * @param cls the socket
661  * @param tc the Task context
662  */
663 static void
664 retransmission_timeout_task (void *cls,
665                              const struct GNUNET_SCHEDULER_TaskContext *tc)
666 {
667   struct GNUNET_STREAM_Socket *socket = cls;
668   
669   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
670     return;
671
672   LOG (GNUNET_ERROR_TYPE_DEBUG,
673        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
674   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
675   write_data (socket);
676 }
677
678
679 /**
680  * Task for sending ACK message
681  *
682  * @param cls the socket
683  * @param tc the Task context
684  */
685 static void
686 ack_task (void *cls,
687           const struct GNUNET_SCHEDULER_TaskContext *tc)
688 {
689   struct GNUNET_STREAM_Socket *socket = cls;
690   struct GNUNET_STREAM_AckMessage *ack_msg;
691
692   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
693   {
694     return;
695   }
696   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
697   /* Create the ACK Message */
698   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
699   ack_msg->header.header.size = htons (sizeof (struct 
700                                                GNUNET_STREAM_AckMessage));
701   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
702   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
703   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
704   ack_msg->receive_window_remaining = 
705     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
706   socket->ack_msg = ack_msg;
707   /* Request MESH for sending ACK */
708   socket->ack_transmit_handle = 
709     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
710                                        0, /* Corking */
711                                        1, /* Priority */
712                                        socket->retransmit_timeout,
713                                        &socket->other_peer,
714                                        ntohs (ack_msg->header.header.size),
715                                        &send_ack_notify,
716                                        socket);
717 }
718
719
720 /**
721  * Retransmission task for shutdown messages
722  *
723  * @param cls the shutdown handle
724  * @param tc the Task Context
725  */
726 static void
727 close_msg_retransmission_task (void *cls,
728                                const struct GNUNET_SCHEDULER_TaskContext *tc)
729 {
730   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
731   struct GNUNET_STREAM_MessageHeader *msg;
732   struct GNUNET_STREAM_Socket *socket;
733
734   GNUNET_assert (NULL != shutdown_handle);
735   socket = shutdown_handle->socket;
736
737   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
738   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
739   switch (shutdown_handle->operation)
740   {
741   case SHUT_RDWR:
742     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
743     break;
744   case SHUT_RD:
745     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
746     break;
747   case SHUT_WR:
748     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
749     break;
750   default:
751     GNUNET_free (msg);
752     shutdown_handle->close_msg_retransmission_task_id = 
753       GNUNET_SCHEDULER_NO_TASK;
754     return;
755   }
756   queue_message (socket, msg, NULL, NULL);
757   shutdown_handle->close_msg_retransmission_task_id =
758     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
759                                   &close_msg_retransmission_task,
760                                   shutdown_handle);
761 }
762
763
764 /**
765  * Function to modify a bit in GNUNET_STREAM_AckBitmap
766  *
767  * @param bitmap the bitmap to modify
768  * @param bit the bit number to modify
769  * @param value GNUNET_YES to on, GNUNET_NO to off
770  */
771 static void
772 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
773                       unsigned int bit, 
774                       int value)
775 {
776   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
777   if (GNUNET_YES == value)
778     *bitmap |= (1LL << bit);
779   else
780     *bitmap &= ~(1LL << bit);
781 }
782
783
784 /**
785  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
786  *
787  * @param bitmap address of the bitmap that has to be checked
788  * @param bit the bit number to check
789  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
790  */
791 static uint8_t
792 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
793                       unsigned int bit)
794 {
795   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
796   return 0 != (*bitmap & (1LL << bit));
797 }
798
799
800 /**
801  * Writes data using the given socket. The amount of data written is limited by
802  * the receiver_window_size
803  *
804  * @param socket the socket to use
805  */
806 static void 
807 write_data (struct GNUNET_STREAM_Socket *socket)
808 {
809   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
810   int packet;                   /* Although an int, should never be negative */
811   int ack_packet;
812
813   ack_packet = -1;
814   /* Find the last acknowledged packet */
815   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
816   {      
817     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
818                                             packet))
819       ack_packet = packet;        
820     else if (NULL == io_handle->messages[packet])
821       break;
822   }
823   /* Resend packets which weren't ack'ed */
824   for (packet=0; packet < ack_packet; packet++)
825   {
826     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
827                                            packet))
828     {
829       LOG (GNUNET_ERROR_TYPE_DEBUG,
830            "%s: Placing DATA message with sequence %u in send queue\n",
831            GNUNET_i2s (&socket->other_peer),
832            ntohl (io_handle->messages[packet]->sequence_number));
833       copy_and_queue_message (socket,
834                               &io_handle->messages[packet]->header,
835                               NULL,
836                               NULL);
837     }
838   }
839   packet = ack_packet + 1;
840   /* Now send new packets if there is enough buffer space */
841   while ( (NULL != io_handle->messages[packet]) &&
842           (socket->receiver_window_available 
843            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
844           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
845   {
846     socket->receiver_window_available -= 
847       ntohs (io_handle->messages[packet]->header.header.size);
848     LOG (GNUNET_ERROR_TYPE_DEBUG,
849          "%s: Placing DATA message with sequence %u in send queue\n",
850          GNUNET_i2s (&socket->other_peer),
851          ntohl (io_handle->messages[packet]->sequence_number));
852     copy_and_queue_message (socket,
853                             &io_handle->messages[packet]->header,
854                             NULL,
855                             NULL);
856     packet++;
857   }
858   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
859     socket->retransmission_timeout_task_id = 
860       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
861                                     (GNUNET_TIME_UNIT_SECONDS, 8),
862                                     &retransmission_timeout_task,
863                                     socket);
864 }
865
866
867 /**
868  * Task for calling the read processor
869  *
870  * @param cls the socket
871  * @param tc the task context
872  */
873 static void
874 call_read_processor (void *cls,
875                      const struct GNUNET_SCHEDULER_TaskContext *tc)
876 {
877   struct GNUNET_STREAM_Socket *socket = cls;
878   size_t read_size;
879   size_t valid_read_size;
880   unsigned int packet;
881   uint32_t sequence_increase;
882   uint32_t offset_increase;
883
884   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
885   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
886     return;
887
888   if (NULL == socket->receive_buffer) 
889     return;
890
891   GNUNET_assert (NULL != socket->read_handle);
892   GNUNET_assert (NULL != socket->read_handle->proc);
893
894   /* Check the bitmap for any holes */
895   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
896   {
897     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
898                                            packet))
899       break;
900   }
901   /* We only call read processor if we have the first packet */
902   GNUNET_assert (0 < packet);
903   valid_read_size = 
904     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
905   GNUNET_assert (0 != valid_read_size);
906   /* Cancel the read_io_timeout_task */
907   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
908   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
909   /* Call the data processor */
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911        "%s: Calling read processor\n",
912        GNUNET_i2s (&socket->other_peer));
913   read_size = 
914     socket->read_handle->proc (socket->read_handle->proc_cls,
915                                socket->status,
916                                socket->receive_buffer + socket->copy_offset,
917                                valid_read_size);
918   LOG (GNUNET_ERROR_TYPE_DEBUG,
919        "%s: Read processor read %d bytes\n",
920        GNUNET_i2s (&socket->other_peer), read_size);
921   LOG (GNUNET_ERROR_TYPE_DEBUG,
922        "%s: Read processor completed successfully\n",
923        GNUNET_i2s (&socket->other_peer));
924   /* Free the read handle */
925   GNUNET_free (socket->read_handle);
926   socket->read_handle = NULL;
927   GNUNET_assert (read_size <= valid_read_size);
928   socket->copy_offset += read_size;
929   /* Determine upto which packet we can remove from the buffer */
930   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
931   {
932     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
933     { packet++; break; }
934     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
935       break;
936   }
937
938   /* If no packets can be removed we can't move the buffer */
939   if (0 == packet) return;
940   sequence_increase = packet;
941   LOG (GNUNET_ERROR_TYPE_DEBUG,
942        "%s: Sequence increase after read processor completion: %u\n",
943        GNUNET_i2s (&socket->other_peer), sequence_increase);
944
945   /* Shift the data in the receive buffer */
946   socket->receive_buffer = 
947     memmove (socket->receive_buffer,
948              socket->receive_buffer 
949              + socket->receive_buffer_boundaries[sequence_increase-1],
950              socket->receive_buffer_size
951              - socket->receive_buffer_boundaries[sequence_increase-1]);
952   /* Shift the bitmap */
953   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
954   /* Set read_sequence_number */
955   socket->read_sequence_number += sequence_increase;
956   /* Set read_offset */
957   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
958   socket->read_offset += offset_increase;
959   /* Fix copy_offset */
960   GNUNET_assert (offset_increase <= socket->copy_offset);
961   socket->copy_offset -= offset_increase;
962   /* Fix relative boundaries */
963   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
964   {
965     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
966     {
967       uint32_t ahead_buffer_boundary;
968
969       ahead_buffer_boundary = 
970         socket->receive_buffer_boundaries[packet + sequence_increase];
971       if (0 == ahead_buffer_boundary)
972         socket->receive_buffer_boundaries[packet] = 0;
973       else
974       {
975         GNUNET_assert (offset_increase < ahead_buffer_boundary);
976         socket->receive_buffer_boundaries[packet] = 
977           ahead_buffer_boundary - offset_increase;
978       }
979     }
980     else
981       socket->receive_buffer_boundaries[packet] = 0;
982   }
983 }
984
985
986 /**
987  * Cancels the existing read io handle
988  *
989  * @param cls the closure from the SCHEDULER call
990  * @param tc the task context
991  */
992 static void
993 read_io_timeout (void *cls, 
994                  const struct GNUNET_SCHEDULER_TaskContext *tc)
995 {
996   struct GNUNET_STREAM_Socket *socket = cls;
997   GNUNET_STREAM_DataProcessor proc;
998   void *proc_cls;
999
1000   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1001   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1002   {
1003     LOG (GNUNET_ERROR_TYPE_DEBUG,
1004          "%s: Read task timedout - Cancelling it\n",
1005          GNUNET_i2s (&socket->other_peer));
1006     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1007     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1008   }
1009   GNUNET_assert (NULL != socket->read_handle);
1010   proc = socket->read_handle->proc;
1011   proc_cls = socket->read_handle->proc_cls;
1012
1013   GNUNET_free (socket->read_handle);
1014   socket->read_handle = NULL;
1015   /* Call the read processor to signal timeout */
1016   proc (proc_cls,
1017         GNUNET_STREAM_TIMEOUT,
1018         NULL,
1019         0);
1020 }
1021
1022
1023 /**
1024  * Handler for DATA messages; Same for both client and server
1025  *
1026  * @param socket the socket through which the ack was received
1027  * @param tunnel connection to the other end
1028  * @param sender who sent the message
1029  * @param msg the data message
1030  * @param atsi performance data for the connection
1031  * @return GNUNET_OK to keep the connection open,
1032  *         GNUNET_SYSERR to close it (signal serious error)
1033  */
1034 static int
1035 handle_data (struct GNUNET_STREAM_Socket *socket,
1036              struct GNUNET_MESH_Tunnel *tunnel,
1037              const struct GNUNET_PeerIdentity *sender,
1038              const struct GNUNET_STREAM_DataMessage *msg,
1039              const struct GNUNET_ATS_Information*atsi)
1040 {
1041   const void *payload;
1042   uint32_t bytes_needed;
1043   uint32_t relative_offset;
1044   uint32_t relative_sequence_number;
1045   uint16_t size;
1046
1047   size = htons (msg->header.header.size);
1048   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1049   {
1050     GNUNET_break_op (0);
1051     return GNUNET_SYSERR;
1052   }
1053
1054   if (0 != memcmp (sender,
1055                    &socket->other_peer,
1056                    sizeof (struct GNUNET_PeerIdentity)))
1057   {
1058     LOG (GNUNET_ERROR_TYPE_DEBUG,
1059          "%s: Received DATA from non-confirming peer\n",
1060          GNUNET_i2s (&socket->other_peer));
1061     return GNUNET_YES;
1062   }
1063
1064   switch (socket->state)
1065   {
1066   case STATE_ESTABLISHED:
1067   case STATE_TRANSMIT_CLOSED:
1068   case STATE_TRANSMIT_CLOSE_WAIT:
1069       
1070     /* check if the message's sequence number is in the range we are
1071        expecting */
1072     relative_sequence_number = 
1073       ntohl (msg->sequence_number) - socket->read_sequence_number;
1074     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1075     {
1076       LOG (GNUNET_ERROR_TYPE_DEBUG,
1077            "%s: Ignoring received message with sequence number %u\n",
1078            GNUNET_i2s (&socket->other_peer),
1079            ntohl (msg->sequence_number));
1080       /* Start ACK sending task if one is not already present */
1081       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1082       {
1083         socket->ack_task_id = 
1084           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1085                                         (msg->ack_deadline),
1086                                         &ack_task,
1087                                         socket);
1088       }
1089       return GNUNET_YES;
1090     }
1091       
1092     /* Check if we have already seen this message */
1093     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1094                                             relative_sequence_number))
1095     {
1096       LOG (GNUNET_ERROR_TYPE_DEBUG,
1097            "%s: Ignoring already received message with sequence number %u\n",
1098            GNUNET_i2s (&socket->other_peer),
1099            ntohl (msg->sequence_number));
1100       /* Start ACK sending task if one is not already present */
1101       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1102       {
1103         socket->ack_task_id = 
1104           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1105                                         (msg->ack_deadline),
1106                                         &ack_task,
1107                                         socket);
1108       }
1109       return GNUNET_YES;
1110     }
1111
1112     LOG (GNUNET_ERROR_TYPE_DEBUG,
1113          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1114          GNUNET_i2s (&socket->other_peer),
1115          ntohl (msg->sequence_number),
1116          ntohs (msg->header.header.size),
1117          GNUNET_i2s (&socket->other_peer));
1118       
1119     /* Check if we have to allocate the buffer */
1120     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1121     relative_offset = ntohl (msg->offset) - socket->read_offset;
1122     bytes_needed = relative_offset + size;
1123     if (bytes_needed > socket->receive_buffer_size)
1124     {
1125       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1126       {
1127         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1128                                                  bytes_needed);
1129         socket->receive_buffer_size = bytes_needed;
1130       }
1131       else
1132       {
1133         LOG (GNUNET_ERROR_TYPE_DEBUG,
1134              "%s: Cannot accommodate packet %d as buffer is full\n",
1135              GNUNET_i2s (&socket->other_peer),
1136              ntohl (msg->sequence_number));
1137         return GNUNET_YES;
1138       }
1139     }
1140       
1141     /* Copy Data to buffer */
1142     payload = &msg[1];
1143     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1144     memcpy (socket->receive_buffer + relative_offset,
1145             payload,
1146             size);
1147     socket->receive_buffer_boundaries[relative_sequence_number] = 
1148       relative_offset + size;
1149       
1150     /* Modify the ACK bitmap */
1151     ackbitmap_modify_bit (&socket->ack_bitmap,
1152                           relative_sequence_number,
1153                           GNUNET_YES);
1154
1155     /* Start ACK sending task if one is not already present */
1156     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1157     {
1158       socket->ack_task_id = 
1159         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1160                                       (msg->ack_deadline),
1161                                       &ack_task,
1162                                       socket);
1163     }
1164
1165     if ((NULL != socket->read_handle) /* A read handle is waiting */
1166         /* There is no current read task */
1167         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1168         /* We have the first packet */
1169         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1170                                                0)))
1171     {
1172       LOG (GNUNET_ERROR_TYPE_DEBUG,
1173            "%s: Scheduling read processor\n",
1174            GNUNET_i2s (&socket->other_peer));
1175           
1176       socket->read_task_id = 
1177         GNUNET_SCHEDULER_add_now (&call_read_processor,
1178                                   socket);
1179     }
1180       
1181     break;
1182
1183   default:
1184     LOG (GNUNET_ERROR_TYPE_DEBUG,
1185          "%s: Received data message when it cannot be handled\n",
1186          GNUNET_i2s (&socket->other_peer));
1187     break;
1188   }
1189   return GNUNET_YES;
1190 }
1191
1192
1193 /**
1194  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1195  *
1196  * @param cls the socket (set from GNUNET_MESH_connect)
1197  * @param tunnel connection to the other end
1198  * @param tunnel_ctx place to store local state associated with the tunnel
1199  * @param sender who sent the message
1200  * @param message the actual message
1201  * @param atsi performance data for the connection
1202  * @return GNUNET_OK to keep the connection open,
1203  *         GNUNET_SYSERR to close it (signal serious error)
1204  */
1205 static int
1206 client_handle_data (void *cls,
1207                     struct GNUNET_MESH_Tunnel *tunnel,
1208                     void **tunnel_ctx,
1209                     const struct GNUNET_PeerIdentity *sender,
1210                     const struct GNUNET_MessageHeader *message,
1211                     const struct GNUNET_ATS_Information*atsi)
1212 {
1213   struct GNUNET_STREAM_Socket *socket = cls;
1214
1215   return handle_data (socket, 
1216                       tunnel, 
1217                       sender, 
1218                       (const struct GNUNET_STREAM_DataMessage *) message, 
1219                       atsi);
1220 }
1221
1222
1223 /**
1224  * Callback to set state to ESTABLISHED
1225  *
1226  * @param cls the closure from queue_message FIXME: document
1227  * @param socket the socket to requiring state change
1228  */
1229 static void
1230 set_state_established (void *cls,
1231                        struct GNUNET_STREAM_Socket *socket)
1232 {
1233   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1234        "%s: Attaining ESTABLISHED state\n",
1235        GNUNET_i2s (&socket->other_peer));
1236   socket->write_offset = 0;
1237   socket->read_offset = 0;
1238   socket->state = STATE_ESTABLISHED;
1239   /* FIXME: What if listen_cb is NULL */
1240   if (NULL != socket->lsocket)
1241   {
1242     LOG (GNUNET_ERROR_TYPE_DEBUG,
1243          "%s: Calling listen callback\n",
1244          GNUNET_i2s (&socket->other_peer));
1245     if (GNUNET_SYSERR == 
1246         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1247                                     socket,
1248                                     &socket->other_peer))
1249     {
1250       socket->state = STATE_CLOSED;
1251       /* FIXME: We should close in a decent way */
1252       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1253       GNUNET_free (socket);
1254     }
1255   }
1256   else if (socket->open_cb)
1257     socket->open_cb (socket->open_cls, socket);
1258 }
1259
1260
1261 /**
1262  * Callback to set state to HELLO_WAIT
1263  *
1264  * @param cls the closure from queue_message
1265  * @param socket the socket to requiring state change
1266  */
1267 static void
1268 set_state_hello_wait (void *cls,
1269                       struct GNUNET_STREAM_Socket *socket)
1270 {
1271   GNUNET_assert (STATE_INIT == socket->state);
1272   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1273        "%s: Attaining HELLO_WAIT state\n",
1274        GNUNET_i2s (&socket->other_peer));
1275   socket->state = STATE_HELLO_WAIT;
1276 }
1277
1278
1279 /**
1280  * Callback to set state to CLOSE_WAIT
1281  *
1282  * @param cls the closure from queue_message
1283  * @param socket the socket requiring state change
1284  */
1285 static void
1286 set_state_close_wait (void *cls,
1287                       struct GNUNET_STREAM_Socket *socket)
1288 {
1289   LOG (GNUNET_ERROR_TYPE_DEBUG,
1290        "%s: Attaing CLOSE_WAIT state\n",
1291        GNUNET_i2s (&socket->other_peer));
1292   socket->state = STATE_CLOSE_WAIT;
1293   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1294   socket->receive_buffer = NULL;
1295   socket->receive_buffer_size = 0;
1296 }
1297
1298
1299 /**
1300  * Callback to set state to RECEIVE_CLOSE_WAIT
1301  *
1302  * @param cls the closure from queue_message
1303  * @param socket the socket requiring state change
1304  */
1305 static void
1306 set_state_receive_close_wait (void *cls,
1307                               struct GNUNET_STREAM_Socket *socket)
1308 {
1309   LOG (GNUNET_ERROR_TYPE_DEBUG,
1310        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1311        GNUNET_i2s (&socket->other_peer));
1312   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1313   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1314   socket->receive_buffer = NULL;
1315   socket->receive_buffer_size = 0;
1316 }
1317
1318
1319 /**
1320  * Callback to set state to TRANSMIT_CLOSE_WAIT
1321  *
1322  * @param cls the closure from queue_message
1323  * @param socket the socket requiring state change
1324  */
1325 static void
1326 set_state_transmit_close_wait (void *cls,
1327                                struct GNUNET_STREAM_Socket *socket)
1328 {
1329   LOG (GNUNET_ERROR_TYPE_DEBUG,
1330        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1331        GNUNET_i2s (&socket->other_peer));
1332   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1333 }
1334
1335
1336 /**
1337  * Callback to set state to CLOSED
1338  *
1339  * @param cls the closure from queue_message
1340  * @param socket the socket requiring state change
1341  */
1342 static void
1343 set_state_closed (void *cls,
1344                   struct GNUNET_STREAM_Socket *socket)
1345 {
1346   socket->state = STATE_CLOSED;
1347 }
1348
1349 /**
1350  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1351  * socket
1352  *
1353  * @param socket the socket for which this HelloAckMessage has to be generated
1354  * @return the HelloAckMessage
1355  */
1356 static struct GNUNET_STREAM_HelloAckMessage *
1357 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1358 {
1359   struct GNUNET_STREAM_HelloAckMessage *msg;
1360
1361   /* Get the random sequence number */
1362   socket->write_sequence_number = 
1363     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1364   LOG (GNUNET_ERROR_TYPE_DEBUG,
1365        "%s: Generated write sequence number %u\n",
1366        GNUNET_i2s (&socket->other_peer),
1367        (unsigned int) socket->write_sequence_number);
1368   
1369   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1370   msg->header.header.size = 
1371     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1372   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1373   msg->sequence_number = htonl (socket->write_sequence_number);
1374   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1375
1376   return msg;
1377 }
1378
1379
1380 /**
1381  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1382  *
1383  * @param cls the socket (set from GNUNET_MESH_connect)
1384  * @param tunnel connection to the other end
1385  * @param tunnel_ctx this is NULL
1386  * @param sender who sent the message
1387  * @param message the actual message
1388  * @param atsi performance data for the connection
1389  * @return GNUNET_OK to keep the connection open,
1390  *         GNUNET_SYSERR to close it (signal serious error)
1391  */
1392 static int
1393 client_handle_hello_ack (void *cls,
1394                          struct GNUNET_MESH_Tunnel *tunnel,
1395                          void **tunnel_ctx,
1396                          const struct GNUNET_PeerIdentity *sender,
1397                          const struct GNUNET_MessageHeader *message,
1398                          const struct GNUNET_ATS_Information*atsi)
1399 {
1400   struct GNUNET_STREAM_Socket *socket = cls;
1401   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1402   struct GNUNET_STREAM_HelloAckMessage *reply;
1403
1404   if (0 != memcmp (sender,
1405                    &socket->other_peer,
1406                    sizeof (struct GNUNET_PeerIdentity)))
1407   {
1408     LOG (GNUNET_ERROR_TYPE_DEBUG,
1409          "%s: Received HELLO_ACK from non-confirming peer\n",
1410          GNUNET_i2s (&socket->other_peer));
1411     return GNUNET_YES;
1412   }
1413   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1414   LOG (GNUNET_ERROR_TYPE_DEBUG,
1415        "%s: Received HELLO_ACK from %s\n",
1416        GNUNET_i2s (&socket->other_peer),
1417        GNUNET_i2s (&socket->other_peer));
1418
1419   GNUNET_assert (socket->tunnel == tunnel);
1420   switch (socket->state)
1421   {
1422   case STATE_HELLO_WAIT:
1423     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1424     LOG (GNUNET_ERROR_TYPE_DEBUG,
1425          "%s: Read sequence number %u\n",
1426          GNUNET_i2s (&socket->other_peer),
1427          (unsigned int) socket->read_sequence_number);
1428     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1429     reply = generate_hello_ack_msg (socket);
1430     queue_message (socket,
1431                    &reply->header, 
1432                    &set_state_established, 
1433                    NULL);      
1434     return GNUNET_OK;
1435   case STATE_ESTABLISHED:
1436   case STATE_RECEIVE_CLOSE_WAIT:
1437     // call statistics (# ACKs ignored++)
1438     return GNUNET_OK;
1439   case STATE_INIT:
1440   default:
1441     LOG (GNUNET_ERROR_TYPE_DEBUG,
1442          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1443          GNUNET_i2s (&socket->other_peer),
1444          GNUNET_i2s (&socket->other_peer),
1445          socket->state);
1446     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1447     return GNUNET_SYSERR;
1448   }
1449
1450 }
1451
1452
1453 /**
1454  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1455  *
1456  * @param cls the socket (set from GNUNET_MESH_connect)
1457  * @param tunnel connection to the other end
1458  * @param tunnel_ctx this is NULL
1459  * @param sender who sent the message
1460  * @param message the actual message
1461  * @param atsi performance data for the connection
1462  * @return GNUNET_OK to keep the connection open,
1463  *         GNUNET_SYSERR to close it (signal serious error)
1464  */
1465 static int
1466 client_handle_reset (void *cls,
1467                      struct GNUNET_MESH_Tunnel *tunnel,
1468                      void **tunnel_ctx,
1469                      const struct GNUNET_PeerIdentity *sender,
1470                      const struct GNUNET_MessageHeader *message,
1471                      const struct GNUNET_ATS_Information*atsi)
1472 {
1473   // struct GNUNET_STREAM_Socket *socket = cls;
1474
1475   return GNUNET_OK;
1476 }
1477
1478
1479 /**
1480  * Common message handler for handling TRANSMIT_CLOSE messages
1481  *
1482  * @param socket the socket through which the ack was received
1483  * @param tunnel connection to the other end
1484  * @param sender who sent the message
1485  * @param msg the transmit close message
1486  * @param atsi performance data for the connection
1487  * @return GNUNET_OK to keep the connection open,
1488  *         GNUNET_SYSERR to close it (signal serious error)
1489  */
1490 static int
1491 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1492                        struct GNUNET_MESH_Tunnel *tunnel,
1493                        const struct GNUNET_PeerIdentity *sender,
1494                        const struct GNUNET_STREAM_MessageHeader *msg,
1495                        const struct GNUNET_ATS_Information*atsi)
1496 {
1497   struct GNUNET_STREAM_MessageHeader *reply;
1498
1499   switch (socket->state)
1500   {
1501   case STATE_ESTABLISHED:
1502     socket->state = STATE_RECEIVE_CLOSED;
1503
1504     /* Send TRANSMIT_CLOSE_ACK */
1505     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1506     reply->header.type = 
1507       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1508     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1509     queue_message (socket, reply, NULL, NULL);
1510     break;
1511
1512   default:
1513     /* FIXME: Call statistics? */
1514     break;
1515   }
1516   return GNUNET_YES;
1517 }
1518
1519
1520 /**
1521  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1522  *
1523  * @param cls the socket (set from GNUNET_MESH_connect)
1524  * @param tunnel connection to the other end
1525  * @param tunnel_ctx this is NULL
1526  * @param sender who sent the message
1527  * @param message the actual message
1528  * @param atsi performance data for the connection
1529  * @return GNUNET_OK to keep the connection open,
1530  *         GNUNET_SYSERR to close it (signal serious error)
1531  */
1532 static int
1533 client_handle_transmit_close (void *cls,
1534                               struct GNUNET_MESH_Tunnel *tunnel,
1535                               void **tunnel_ctx,
1536                               const struct GNUNET_PeerIdentity *sender,
1537                               const struct GNUNET_MessageHeader *message,
1538                               const struct GNUNET_ATS_Information*atsi)
1539 {
1540   struct GNUNET_STREAM_Socket *socket = cls;
1541   
1542   return handle_transmit_close (socket,
1543                                 tunnel,
1544                                 sender,
1545                                 (struct GNUNET_STREAM_MessageHeader *)message,
1546                                 atsi);
1547 }
1548
1549
1550 /**
1551  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1552  *
1553  * @param socket the socket
1554  * @param tunnel connection to the other end
1555  * @param sender who sent the message
1556  * @param message the actual message
1557  * @param atsi performance data for the connection
1558  * @param operation the close operation which is being ACK'ed
1559  * @return GNUNET_OK to keep the connection open,
1560  *         GNUNET_SYSERR to close it (signal serious error)
1561  */
1562 static int
1563 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1564                           struct GNUNET_MESH_Tunnel *tunnel,
1565                           const struct GNUNET_PeerIdentity *sender,
1566                           const struct GNUNET_STREAM_MessageHeader *message,
1567                           const struct GNUNET_ATS_Information *atsi,
1568                           int operation)
1569 {
1570   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1571
1572   shutdown_handle = socket->shutdown_handle;
1573   if (NULL == shutdown_handle)
1574   {
1575     LOG (GNUNET_ERROR_TYPE_DEBUG,
1576          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1577          GNUNET_i2s (&socket->other_peer));
1578     return GNUNET_OK;
1579   }
1580
1581   switch (operation)
1582   {
1583   case SHUT_RDWR:
1584     switch (socket->state)
1585     {
1586     case STATE_CLOSE_WAIT:
1587       if (SHUT_RDWR != shutdown_handle->operation)
1588       {
1589         LOG (GNUNET_ERROR_TYPE_DEBUG,
1590              "%s: Received CLOSE_ACK when shutdown handle is not for "
1591              "SHUT_RDWR\n",
1592              GNUNET_i2s (&socket->other_peer));
1593         return GNUNET_OK;
1594       }
1595
1596       LOG (GNUNET_ERROR_TYPE_DEBUG,
1597            "%s: Received CLOSE_ACK from %s\n",
1598            GNUNET_i2s (&socket->other_peer),
1599            GNUNET_i2s (&socket->other_peer));
1600       socket->state = STATE_CLOSED;
1601       break;
1602     default:
1603       LOG (GNUNET_ERROR_TYPE_DEBUG,
1604            "%s: Received CLOSE_ACK when in it not expected\n",
1605            GNUNET_i2s (&socket->other_peer));
1606       return GNUNET_OK;
1607     }
1608     break;
1609
1610   case SHUT_RD:
1611     switch (socket->state)
1612     {
1613     case STATE_RECEIVE_CLOSE_WAIT:
1614       if (SHUT_RD != shutdown_handle->operation)
1615       {
1616         LOG (GNUNET_ERROR_TYPE_DEBUG,
1617              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1618              "is not for SHUT_RD\n",
1619              GNUNET_i2s (&socket->other_peer));
1620         return GNUNET_OK;
1621       }
1622
1623       LOG (GNUNET_ERROR_TYPE_DEBUG,
1624            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1625            GNUNET_i2s (&socket->other_peer),
1626            GNUNET_i2s (&socket->other_peer));
1627       socket->state = STATE_RECEIVE_CLOSED;
1628       break;
1629     default:
1630       LOG (GNUNET_ERROR_TYPE_DEBUG,
1631            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1632            GNUNET_i2s (&socket->other_peer));
1633       return GNUNET_OK;
1634     }
1635
1636     break;
1637   case SHUT_WR:
1638     switch (socket->state)
1639     {
1640     case STATE_TRANSMIT_CLOSE_WAIT:
1641       if (SHUT_WR != shutdown_handle->operation)
1642       {
1643         LOG (GNUNET_ERROR_TYPE_DEBUG,
1644              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1645              "is not for SHUT_WR\n",
1646              GNUNET_i2s (&socket->other_peer));
1647         return GNUNET_OK;
1648       }
1649
1650       LOG (GNUNET_ERROR_TYPE_DEBUG,
1651            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1652            GNUNET_i2s (&socket->other_peer),
1653            GNUNET_i2s (&socket->other_peer));
1654       socket->state = STATE_TRANSMIT_CLOSED;
1655       break;
1656     default:
1657       LOG (GNUNET_ERROR_TYPE_DEBUG,
1658            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1659            GNUNET_i2s (&socket->other_peer));
1660           
1661       return GNUNET_OK;
1662     }
1663     break;
1664   default:
1665     GNUNET_assert (0);
1666   }
1667
1668   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1669     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1670                                    operation);
1671   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1672   socket->shutdown_handle = NULL;
1673   if (GNUNET_SCHEDULER_NO_TASK
1674       != shutdown_handle->close_msg_retransmission_task_id)
1675   {
1676     GNUNET_SCHEDULER_cancel
1677       (shutdown_handle->close_msg_retransmission_task_id);
1678     shutdown_handle->close_msg_retransmission_task_id =
1679       GNUNET_SCHEDULER_NO_TASK;
1680   }
1681   return GNUNET_OK;
1682 }
1683
1684
1685 /**
1686  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1687  *
1688  * @param cls the socket (set from GNUNET_MESH_connect)
1689  * @param tunnel connection to the other end
1690  * @param tunnel_ctx this is NULL
1691  * @param sender who sent the message
1692  * @param message the actual message
1693  * @param atsi performance data for the connection
1694  * @return GNUNET_OK to keep the connection open,
1695  *         GNUNET_SYSERR to close it (signal serious error)
1696  */
1697 static int
1698 client_handle_transmit_close_ack (void *cls,
1699                                   struct GNUNET_MESH_Tunnel *tunnel,
1700                                   void **tunnel_ctx,
1701                                   const struct GNUNET_PeerIdentity *sender,
1702                                   const struct GNUNET_MessageHeader *message,
1703                                   const struct GNUNET_ATS_Information*atsi)
1704 {
1705   struct GNUNET_STREAM_Socket *socket = cls;
1706
1707   return handle_generic_close_ack (socket,
1708                                    tunnel,
1709                                    sender,
1710                                    (const struct GNUNET_STREAM_MessageHeader *)
1711                                    message,
1712                                    atsi,
1713                                    SHUT_WR);
1714 }
1715
1716
1717 /**
1718  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1719  *
1720  * @param socket the socket
1721  * @param tunnel connection to the other end
1722  * @param sender who sent the message
1723  * @param message the actual message
1724  * @param atsi performance data for the connection
1725  * @return GNUNET_OK to keep the connection open,
1726  *         GNUNET_SYSERR to close it (signal serious error)
1727  */
1728 static int
1729 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1730                       struct GNUNET_MESH_Tunnel *tunnel,
1731                       const struct GNUNET_PeerIdentity *sender,
1732                       const struct GNUNET_STREAM_MessageHeader *message,
1733                       const struct GNUNET_ATS_Information *atsi)
1734 {
1735   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1736
1737   switch (socket->state)
1738   {
1739   case STATE_INIT:
1740   case STATE_LISTEN:
1741   case STATE_HELLO_WAIT:
1742     LOG (GNUNET_ERROR_TYPE_DEBUG,
1743          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1744          GNUNET_i2s (&socket->other_peer));
1745     return GNUNET_OK;
1746   default:
1747     break;
1748   }
1749   
1750   LOG (GNUNET_ERROR_TYPE_DEBUG,
1751        "%s: Received RECEIVE_CLOSE from %s\n",
1752        GNUNET_i2s (&socket->other_peer),
1753        GNUNET_i2s (&socket->other_peer));
1754   receive_close_ack =
1755     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1756   receive_close_ack->header.size =
1757     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1758   receive_close_ack->header.type =
1759     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1760   queue_message (socket,
1761                  receive_close_ack,
1762                  &set_state_closed,
1763                  NULL);
1764   
1765   /* FIXME: Handle the case where write handle is present; the write operation
1766      should be deemed as finised and the write continuation callback
1767      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1768   return GNUNET_OK;
1769 }
1770
1771
1772 /**
1773  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1774  *
1775  * @param cls the socket (set from GNUNET_MESH_connect)
1776  * @param tunnel connection to the other end
1777  * @param tunnel_ctx this is NULL
1778  * @param sender who sent the message
1779  * @param message the actual message
1780  * @param atsi performance data for the connection
1781  * @return GNUNET_OK to keep the connection open,
1782  *         GNUNET_SYSERR to close it (signal serious error)
1783  */
1784 static int
1785 client_handle_receive_close (void *cls,
1786                              struct GNUNET_MESH_Tunnel *tunnel,
1787                              void **tunnel_ctx,
1788                              const struct GNUNET_PeerIdentity *sender,
1789                              const struct GNUNET_MessageHeader *message,
1790                              const struct GNUNET_ATS_Information*atsi)
1791 {
1792   struct GNUNET_STREAM_Socket *socket = cls;
1793
1794   return
1795     handle_receive_close (socket,
1796                           tunnel,
1797                           sender,
1798                           (const struct GNUNET_STREAM_MessageHeader *) message,
1799                           atsi);
1800 }
1801
1802
1803 /**
1804  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1805  *
1806  * @param cls the socket (set from GNUNET_MESH_connect)
1807  * @param tunnel connection to the other end
1808  * @param tunnel_ctx this is NULL
1809  * @param sender who sent the message
1810  * @param message the actual message
1811  * @param atsi performance data for the connection
1812  * @return GNUNET_OK to keep the connection open,
1813  *         GNUNET_SYSERR to close it (signal serious error)
1814  */
1815 static int
1816 client_handle_receive_close_ack (void *cls,
1817                                  struct GNUNET_MESH_Tunnel *tunnel,
1818                                  void **tunnel_ctx,
1819                                  const struct GNUNET_PeerIdentity *sender,
1820                                  const struct GNUNET_MessageHeader *message,
1821                                  const struct GNUNET_ATS_Information*atsi)
1822 {
1823   struct GNUNET_STREAM_Socket *socket = cls;
1824
1825   return handle_generic_close_ack (socket,
1826                                    tunnel,
1827                                    sender,
1828                                    (const struct GNUNET_STREAM_MessageHeader *)
1829                                    message,
1830                                    atsi,
1831                                    SHUT_RD);
1832 }
1833
1834
1835 /**
1836  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1837  *
1838  * @param socket the socket
1839  * @param tunnel connection to the other end
1840  * @param sender who sent the message
1841  * @param message the actual message
1842  * @param atsi performance data for the connection
1843  * @return GNUNET_OK to keep the connection open,
1844  *         GNUNET_SYSERR to close it (signal serious error)
1845  */
1846 static int
1847 handle_close (struct GNUNET_STREAM_Socket *socket,
1848               struct GNUNET_MESH_Tunnel *tunnel,
1849               const struct GNUNET_PeerIdentity *sender,
1850               const struct GNUNET_STREAM_MessageHeader *message,
1851               const struct GNUNET_ATS_Information*atsi)
1852 {
1853   struct GNUNET_STREAM_MessageHeader *close_ack;
1854
1855   switch (socket->state)
1856   {
1857   case STATE_INIT:
1858   case STATE_LISTEN:
1859   case STATE_HELLO_WAIT:
1860     LOG (GNUNET_ERROR_TYPE_DEBUG,
1861          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1862          GNUNET_i2s (&socket->other_peer));
1863     return GNUNET_OK;
1864   default:
1865     break;
1866   }
1867
1868   LOG (GNUNET_ERROR_TYPE_DEBUG,
1869        "%s: Received CLOSE from %s\n",
1870        GNUNET_i2s (&socket->other_peer),
1871        GNUNET_i2s (&socket->other_peer));
1872   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1873   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1874   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1875   queue_message (socket,
1876                  close_ack,
1877                  &set_state_closed,
1878                  NULL);
1879   if (socket->state == STATE_CLOSED)
1880     return GNUNET_OK;
1881
1882   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1883   socket->receive_buffer = NULL;
1884   socket->receive_buffer_size = 0;
1885   return GNUNET_OK;
1886 }
1887
1888
1889 /**
1890  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1891  *
1892  * @param cls the socket (set from GNUNET_MESH_connect)
1893  * @param tunnel connection to the other end
1894  * @param tunnel_ctx this is NULL
1895  * @param sender who sent the message
1896  * @param message the actual message
1897  * @param atsi performance data for the connection
1898  * @return GNUNET_OK to keep the connection open,
1899  *         GNUNET_SYSERR to close it (signal serious error)
1900  */
1901 static int
1902 client_handle_close (void *cls,
1903                      struct GNUNET_MESH_Tunnel *tunnel,
1904                      void **tunnel_ctx,
1905                      const struct GNUNET_PeerIdentity *sender,
1906                      const struct GNUNET_MessageHeader *message,
1907                      const struct GNUNET_ATS_Information*atsi)
1908 {
1909   struct GNUNET_STREAM_Socket *socket = cls;
1910
1911   return handle_close (socket,
1912                        tunnel,
1913                        sender,
1914                        (const struct GNUNET_STREAM_MessageHeader *) message,
1915                        atsi);
1916 }
1917
1918
1919 /**
1920  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1921  *
1922  * @param cls the socket (set from GNUNET_MESH_connect)
1923  * @param tunnel connection to the other end
1924  * @param tunnel_ctx this is NULL
1925  * @param sender who sent the message
1926  * @param message the actual message
1927  * @param atsi performance data for the connection
1928  * @return GNUNET_OK to keep the connection open,
1929  *         GNUNET_SYSERR to close it (signal serious error)
1930  */
1931 static int
1932 client_handle_close_ack (void *cls,
1933                          struct GNUNET_MESH_Tunnel *tunnel,
1934                          void **tunnel_ctx,
1935                          const struct GNUNET_PeerIdentity *sender,
1936                          const struct GNUNET_MessageHeader *message,
1937                          const struct GNUNET_ATS_Information *atsi)
1938 {
1939   struct GNUNET_STREAM_Socket *socket = cls;
1940
1941   return handle_generic_close_ack (socket,
1942                                    tunnel,
1943                                    sender,
1944                                    (const struct GNUNET_STREAM_MessageHeader *) 
1945                                    message,
1946                                    atsi,
1947                                    SHUT_RDWR);
1948 }
1949
1950 /*****************************/
1951 /* Server's Message Handlers */
1952 /*****************************/
1953
1954 /**
1955  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1956  *
1957  * @param cls the closure
1958  * @param tunnel connection to the other end
1959  * @param tunnel_ctx the socket
1960  * @param sender who sent the message
1961  * @param message the actual message
1962  * @param atsi performance data for the connection
1963  * @return GNUNET_OK to keep the connection open,
1964  *         GNUNET_SYSERR to close it (signal serious error)
1965  */
1966 static int
1967 server_handle_data (void *cls,
1968                     struct GNUNET_MESH_Tunnel *tunnel,
1969                     void **tunnel_ctx,
1970                     const struct GNUNET_PeerIdentity *sender,
1971                     const struct GNUNET_MessageHeader *message,
1972                     const struct GNUNET_ATS_Information*atsi)
1973 {
1974   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1975
1976   return handle_data (socket,
1977                       tunnel,
1978                       sender,
1979                       (const struct GNUNET_STREAM_DataMessage *)message,
1980                       atsi);
1981 }
1982
1983
1984 /**
1985  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1986  *
1987  * @param cls the closure
1988  * @param tunnel connection to the other end
1989  * @param tunnel_ctx the socket
1990  * @param sender who sent the message
1991  * @param message the actual message
1992  * @param atsi performance data for the connection
1993  * @return GNUNET_OK to keep the connection open,
1994  *         GNUNET_SYSERR to close it (signal serious error)
1995  */
1996 static int
1997 server_handle_hello (void *cls,
1998                      struct GNUNET_MESH_Tunnel *tunnel,
1999                      void **tunnel_ctx,
2000                      const struct GNUNET_PeerIdentity *sender,
2001                      const struct GNUNET_MessageHeader *message,
2002                      const struct GNUNET_ATS_Information*atsi)
2003 {
2004   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2005   struct GNUNET_STREAM_HelloAckMessage *reply;
2006
2007   if (0 != memcmp (sender,
2008                    &socket->other_peer,
2009                    sizeof (struct GNUNET_PeerIdentity)))
2010   {
2011     LOG (GNUNET_ERROR_TYPE_DEBUG,
2012          "%s: Received HELLO from non-confirming peer\n",
2013          GNUNET_i2s (&socket->other_peer));
2014     return GNUNET_YES;
2015   }
2016
2017   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2018                  ntohs (message->type));
2019   GNUNET_assert (socket->tunnel == tunnel);
2020   LOG (GNUNET_ERROR_TYPE_DEBUG,
2021        "%s: Received HELLO from %s\n", 
2022        GNUNET_i2s (&socket->other_peer),
2023        GNUNET_i2s (&socket->other_peer));
2024
2025   if (STATE_INIT == socket->state)
2026   {
2027     reply = generate_hello_ack_msg (socket);
2028     queue_message (socket, 
2029                    &reply->header,
2030                    &set_state_hello_wait, 
2031                    NULL);
2032   }
2033   else
2034   {
2035     LOG (GNUNET_ERROR_TYPE_DEBUG,
2036          "%s: Client sent HELLO when in state %d\n", 
2037          GNUNET_i2s (&socket->other_peer),
2038          socket->state);
2039     /* FIXME: Send RESET? */
2040       
2041   }
2042   return GNUNET_OK;
2043 }
2044
2045
2046 /**
2047  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2048  *
2049  * @param cls the closure
2050  * @param tunnel connection to the other end
2051  * @param tunnel_ctx the socket
2052  * @param sender who sent the message
2053  * @param message the actual message
2054  * @param atsi performance data for the connection
2055  * @return GNUNET_OK to keep the connection open,
2056  *         GNUNET_SYSERR to close it (signal serious error)
2057  */
2058 static int
2059 server_handle_hello_ack (void *cls,
2060                          struct GNUNET_MESH_Tunnel *tunnel,
2061                          void **tunnel_ctx,
2062                          const struct GNUNET_PeerIdentity *sender,
2063                          const struct GNUNET_MessageHeader *message,
2064                          const struct GNUNET_ATS_Information*atsi)
2065 {
2066   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2067   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2068
2069   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2070                  ntohs (message->type));
2071   GNUNET_assert (socket->tunnel == tunnel);
2072   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2073   if (STATE_HELLO_WAIT == socket->state)
2074   {
2075     LOG (GNUNET_ERROR_TYPE_DEBUG,
2076          "%s: Received HELLO_ACK from %s\n",
2077          GNUNET_i2s (&socket->other_peer),
2078          GNUNET_i2s (&socket->other_peer));
2079     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2080     LOG (GNUNET_ERROR_TYPE_DEBUG,
2081          "%s: Read sequence number %u\n",
2082          GNUNET_i2s (&socket->other_peer),
2083          (unsigned int) socket->read_sequence_number);
2084     socket->receiver_window_available = 
2085       ntohl (ack_message->receiver_window_size);
2086     /* Attain ESTABLISHED state */
2087     set_state_established (NULL, socket);
2088   }
2089   else
2090   {
2091     LOG (GNUNET_ERROR_TYPE_DEBUG,
2092          "Client sent HELLO_ACK when in state %d\n", socket->state);
2093     /* FIXME: Send RESET? */
2094       
2095   }
2096   return GNUNET_OK;
2097 }
2098
2099
2100 /**
2101  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2102  *
2103  * @param cls the closure
2104  * @param tunnel connection to the other end
2105  * @param tunnel_ctx the socket
2106  * @param sender who sent the message
2107  * @param message the actual message
2108  * @param atsi performance data for the connection
2109  * @return GNUNET_OK to keep the connection open,
2110  *         GNUNET_SYSERR to close it (signal serious error)
2111  */
2112 static int
2113 server_handle_reset (void *cls,
2114                      struct GNUNET_MESH_Tunnel *tunnel,
2115                      void **tunnel_ctx,
2116                      const struct GNUNET_PeerIdentity *sender,
2117                      const struct GNUNET_MessageHeader *message,
2118                      const struct GNUNET_ATS_Information*atsi)
2119 {
2120   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2121
2122   return GNUNET_OK;
2123 }
2124
2125
2126 /**
2127  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2128  *
2129  * @param cls the closure
2130  * @param tunnel connection to the other end
2131  * @param tunnel_ctx the socket
2132  * @param sender who sent the message
2133  * @param message the actual message
2134  * @param atsi performance data for the connection
2135  * @return GNUNET_OK to keep the connection open,
2136  *         GNUNET_SYSERR to close it (signal serious error)
2137  */
2138 static int
2139 server_handle_transmit_close (void *cls,
2140                               struct GNUNET_MESH_Tunnel *tunnel,
2141                               void **tunnel_ctx,
2142                               const struct GNUNET_PeerIdentity *sender,
2143                               const struct GNUNET_MessageHeader *message,
2144                               const struct GNUNET_ATS_Information*atsi)
2145 {
2146   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2147
2148   return handle_transmit_close (socket,
2149                                 tunnel,
2150                                 sender,
2151                                 (struct GNUNET_STREAM_MessageHeader *)message,
2152                                 atsi);
2153 }
2154
2155
2156 /**
2157  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2158  *
2159  * @param cls the closure
2160  * @param tunnel connection to the other end
2161  * @param tunnel_ctx the socket
2162  * @param sender who sent the message
2163  * @param message the actual message
2164  * @param atsi performance data for the connection
2165  * @return GNUNET_OK to keep the connection open,
2166  *         GNUNET_SYSERR to close it (signal serious error)
2167  */
2168 static int
2169 server_handle_transmit_close_ack (void *cls,
2170                                   struct GNUNET_MESH_Tunnel *tunnel,
2171                                   void **tunnel_ctx,
2172                                   const struct GNUNET_PeerIdentity *sender,
2173                                   const struct GNUNET_MessageHeader *message,
2174                                   const struct GNUNET_ATS_Information*atsi)
2175 {
2176   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2177
2178   return handle_generic_close_ack (socket,
2179                                    tunnel,
2180                                    sender,
2181                                    (const struct GNUNET_STREAM_MessageHeader *)
2182                                    message,
2183                                    atsi,
2184                                    SHUT_WR);
2185 }
2186
2187
2188 /**
2189  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2190  *
2191  * @param cls the closure
2192  * @param tunnel connection to the other end
2193  * @param tunnel_ctx the socket
2194  * @param sender who sent the message
2195  * @param message the actual message
2196  * @param atsi performance data for the connection
2197  * @return GNUNET_OK to keep the connection open,
2198  *         GNUNET_SYSERR to close it (signal serious error)
2199  */
2200 static int
2201 server_handle_receive_close (void *cls,
2202                              struct GNUNET_MESH_Tunnel *tunnel,
2203                              void **tunnel_ctx,
2204                              const struct GNUNET_PeerIdentity *sender,
2205                              const struct GNUNET_MessageHeader *message,
2206                              const struct GNUNET_ATS_Information*atsi)
2207 {
2208   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2209
2210   return
2211     handle_receive_close (socket,
2212                           tunnel,
2213                           sender,
2214                           (const struct GNUNET_STREAM_MessageHeader *) message,
2215                           atsi);
2216 }
2217
2218
2219 /**
2220  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2221  *
2222  * @param cls the closure
2223  * @param tunnel connection to the other end
2224  * @param tunnel_ctx the socket
2225  * @param sender who sent the message
2226  * @param message the actual message
2227  * @param atsi performance data for the connection
2228  * @return GNUNET_OK to keep the connection open,
2229  *         GNUNET_SYSERR to close it (signal serious error)
2230  */
2231 static int
2232 server_handle_receive_close_ack (void *cls,
2233                                  struct GNUNET_MESH_Tunnel *tunnel,
2234                                  void **tunnel_ctx,
2235                                  const struct GNUNET_PeerIdentity *sender,
2236                                  const struct GNUNET_MessageHeader *message,
2237                                  const struct GNUNET_ATS_Information*atsi)
2238 {
2239   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2240
2241   return handle_generic_close_ack (socket,
2242                                    tunnel,
2243                                    sender,
2244                                    (const struct GNUNET_STREAM_MessageHeader *)
2245                                    message,
2246                                    atsi,
2247                                    SHUT_RD);
2248 }
2249
2250
2251 /**
2252  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2253  *
2254  * @param cls the listen socket (from GNUNET_MESH_connect in
2255  *          GNUNET_STREAM_listen) 
2256  * @param tunnel connection to the other end
2257  * @param tunnel_ctx the socket
2258  * @param sender who sent the message
2259  * @param message the actual message
2260  * @param atsi performance data for the connection
2261  * @return GNUNET_OK to keep the connection open,
2262  *         GNUNET_SYSERR to close it (signal serious error)
2263  */
2264 static int
2265 server_handle_close (void *cls,
2266                      struct GNUNET_MESH_Tunnel *tunnel,
2267                      void **tunnel_ctx,
2268                      const struct GNUNET_PeerIdentity *sender,
2269                      const struct GNUNET_MessageHeader *message,
2270                      const struct GNUNET_ATS_Information*atsi)
2271 {
2272   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2273   
2274   return handle_close (socket,
2275                        tunnel,
2276                        sender,
2277                        (const struct GNUNET_STREAM_MessageHeader *) message,
2278                        atsi);
2279 }
2280
2281
2282 /**
2283  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2284  *
2285  * @param cls the closure
2286  * @param tunnel connection to the other end
2287  * @param tunnel_ctx the socket
2288  * @param sender who sent the message
2289  * @param message the actual message
2290  * @param atsi performance data for the connection
2291  * @return GNUNET_OK to keep the connection open,
2292  *         GNUNET_SYSERR to close it (signal serious error)
2293  */
2294 static int
2295 server_handle_close_ack (void *cls,
2296                          struct GNUNET_MESH_Tunnel *tunnel,
2297                          void **tunnel_ctx,
2298                          const struct GNUNET_PeerIdentity *sender,
2299                          const struct GNUNET_MessageHeader *message,
2300                          const struct GNUNET_ATS_Information*atsi)
2301 {
2302   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2303
2304   return handle_generic_close_ack (socket,
2305                                    tunnel,
2306                                    sender,
2307                                    (const struct GNUNET_STREAM_MessageHeader *) 
2308                                    message,
2309                                    atsi,
2310                                    SHUT_RDWR);
2311 }
2312
2313
2314 /**
2315  * Handler for DATA_ACK messages
2316  *
2317  * @param socket the socket through which the ack was received
2318  * @param tunnel connection to the other end
2319  * @param sender who sent the message
2320  * @param ack the acknowledgment message
2321  * @param atsi performance data for the connection
2322  * @return GNUNET_OK to keep the connection open,
2323  *         GNUNET_SYSERR to close it (signal serious error)
2324  */
2325 static int
2326 handle_ack (struct GNUNET_STREAM_Socket *socket,
2327             struct GNUNET_MESH_Tunnel *tunnel,
2328             const struct GNUNET_PeerIdentity *sender,
2329             const struct GNUNET_STREAM_AckMessage *ack,
2330             const struct GNUNET_ATS_Information*atsi)
2331 {
2332   unsigned int packet;
2333   int need_retransmission;
2334   
2335
2336   if (0 != memcmp (sender,
2337                    &socket->other_peer,
2338                    sizeof (struct GNUNET_PeerIdentity)))
2339   {
2340     LOG (GNUNET_ERROR_TYPE_DEBUG,
2341          "%s: Received ACK from non-confirming peer\n",
2342          GNUNET_i2s (&socket->other_peer));
2343     return GNUNET_YES;
2344   }
2345
2346   switch (socket->state)
2347   {
2348   case (STATE_ESTABLISHED):
2349   case (STATE_RECEIVE_CLOSED):
2350   case (STATE_RECEIVE_CLOSE_WAIT):
2351     if (NULL == socket->write_handle)
2352     {
2353       LOG (GNUNET_ERROR_TYPE_DEBUG,
2354            "%s: Received DATA_ACK when write_handle is NULL\n",
2355            GNUNET_i2s (&socket->other_peer));
2356       return GNUNET_OK;
2357     }
2358     /* FIXME: increment in the base sequence number is breaking current flow
2359      */
2360     if (!((socket->write_sequence_number 
2361            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2362     {
2363       LOG (GNUNET_ERROR_TYPE_DEBUG,
2364            "%s: Received DATA_ACK with unexpected base sequence number\n",
2365            GNUNET_i2s (&socket->other_peer));
2366       LOG (GNUNET_ERROR_TYPE_DEBUG,
2367            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2368            GNUNET_i2s (&socket->other_peer),
2369            socket->write_sequence_number,
2370            ntohl (ack->base_sequence_number));
2371       return GNUNET_OK;
2372     }
2373     /* FIXME: include the case when write_handle is cancelled - ignore the 
2374        acks */
2375
2376     LOG (GNUNET_ERROR_TYPE_DEBUG,
2377          "%s: Received DATA_ACK from %s\n",
2378          GNUNET_i2s (&socket->other_peer),
2379          GNUNET_i2s (&socket->other_peer));
2380       
2381     /* Cancel the retransmission task */
2382     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2383     {
2384       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2385       socket->retransmission_timeout_task_id = 
2386         GNUNET_SCHEDULER_NO_TASK;
2387     }
2388
2389     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2390     {
2391       if (NULL == socket->write_handle->messages[packet]) break;
2392       if (ntohl (ack->base_sequence_number)
2393           >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2394         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2395                               packet,
2396                               GNUNET_YES);
2397       else
2398         if (GNUNET_YES == 
2399             ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2400                                   ntohl (socket->write_handle->messages[packet]->sequence_number)
2401                                   - ntohl (ack->base_sequence_number)))
2402           ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2403                                 packet,
2404                                 GNUNET_YES);
2405     }
2406
2407     /* Update the receive window remaining
2408        FIXME : Should update with the value from a data ack with greater
2409        sequence number */
2410     socket->receiver_window_available = 
2411       ntohl (ack->receive_window_remaining);
2412
2413     /* Check if we have received all acknowledgements */
2414     need_retransmission = GNUNET_NO;
2415     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2416     {
2417       if (NULL == socket->write_handle->messages[packet]) break;
2418       if (GNUNET_YES != ackbitmap_is_bit_set 
2419           (&socket->write_handle->ack_bitmap,packet))
2420       {
2421         need_retransmission = GNUNET_YES;
2422         break;
2423       }
2424     }
2425     if (GNUNET_YES == need_retransmission)
2426     {
2427       write_data (socket);
2428     }
2429     else      /* We have to call the write continuation callback now */
2430     {
2431       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2432       
2433       /* Free the packets */
2434       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2435       {
2436         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2437       }
2438       write_handle = socket->write_handle;
2439       socket->write_handle = NULL;
2440       if (NULL != write_handle->write_cont)
2441         write_handle->write_cont (write_handle->write_cont_cls,
2442                                   socket->status,
2443                                   write_handle->size);
2444       /* We are done with the write handle - Freeing it */
2445       GNUNET_free (write_handle);
2446       LOG (GNUNET_ERROR_TYPE_DEBUG,
2447            "%s: Write completion callback completed\n",
2448            GNUNET_i2s (&socket->other_peer));      
2449     }
2450     break;
2451   default:
2452     break;
2453   }
2454   return GNUNET_OK;
2455 }
2456
2457
2458 /**
2459  * Handler for DATA_ACK messages
2460  *
2461  * @param cls the 'struct GNUNET_STREAM_Socket'
2462  * @param tunnel connection to the other end
2463  * @param tunnel_ctx unused
2464  * @param sender who sent the message
2465  * @param message the actual message
2466  * @param atsi performance data for the connection
2467  * @return GNUNET_OK to keep the connection open,
2468  *         GNUNET_SYSERR to close it (signal serious error)
2469  */
2470 static int
2471 client_handle_ack (void *cls,
2472                    struct GNUNET_MESH_Tunnel *tunnel,
2473                    void **tunnel_ctx,
2474                    const struct GNUNET_PeerIdentity *sender,
2475                    const struct GNUNET_MessageHeader *message,
2476                    const struct GNUNET_ATS_Information*atsi)
2477 {
2478   struct GNUNET_STREAM_Socket *socket = cls;
2479   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2480  
2481   return handle_ack (socket, tunnel, sender, ack, atsi);
2482 }
2483
2484
2485 /**
2486  * Handler for DATA_ACK messages
2487  *
2488  * @param cls the server's listen socket
2489  * @param tunnel connection to the other end
2490  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2491  * @param sender who sent the message
2492  * @param message the actual message
2493  * @param atsi performance data for the connection
2494  * @return GNUNET_OK to keep the connection open,
2495  *         GNUNET_SYSERR to close it (signal serious error)
2496  */
2497 static int
2498 server_handle_ack (void *cls,
2499                    struct GNUNET_MESH_Tunnel *tunnel,
2500                    void **tunnel_ctx,
2501                    const struct GNUNET_PeerIdentity *sender,
2502                    const struct GNUNET_MessageHeader *message,
2503                    const struct GNUNET_ATS_Information*atsi)
2504 {
2505   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2506   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2507  
2508   return handle_ack (socket, tunnel, sender, ack, atsi);
2509 }
2510
2511
2512 /**
2513  * For client message handlers, the stream socket is in the
2514  * closure argument.
2515  */
2516 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2517   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2518   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2519    sizeof (struct GNUNET_STREAM_AckMessage) },
2520   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2521    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2522   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2523    sizeof (struct GNUNET_STREAM_MessageHeader)},
2524   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2525    sizeof (struct GNUNET_STREAM_MessageHeader)},
2526   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2527    sizeof (struct GNUNET_STREAM_MessageHeader)},
2528   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2529    sizeof (struct GNUNET_STREAM_MessageHeader)},
2530   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2531    sizeof (struct GNUNET_STREAM_MessageHeader)},
2532   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2533    sizeof (struct GNUNET_STREAM_MessageHeader)},
2534   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2535    sizeof (struct GNUNET_STREAM_MessageHeader)},
2536   {NULL, 0, 0}
2537 };
2538
2539
2540 /**
2541  * For server message handlers, the stream socket is in the
2542  * tunnel context, and the listen socket in the closure argument.
2543  */
2544 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2545   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2546   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2547    sizeof (struct GNUNET_STREAM_AckMessage) },
2548   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2549    sizeof (struct GNUNET_STREAM_MessageHeader)},
2550   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2551    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2552   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2553    sizeof (struct GNUNET_STREAM_MessageHeader)},
2554   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2555    sizeof (struct GNUNET_STREAM_MessageHeader)},
2556   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2557    sizeof (struct GNUNET_STREAM_MessageHeader)},
2558   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2559    sizeof (struct GNUNET_STREAM_MessageHeader)},
2560   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2561    sizeof (struct GNUNET_STREAM_MessageHeader)},
2562   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2563    sizeof (struct GNUNET_STREAM_MessageHeader)},
2564   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2565    sizeof (struct GNUNET_STREAM_MessageHeader)},
2566   {NULL, 0, 0}
2567 };
2568
2569
2570 /**
2571  * Function called when our target peer is connected to our tunnel
2572  *
2573  * @param cls the socket for which this tunnel is created
2574  * @param peer the peer identity of the target
2575  * @param atsi performance data for the connection
2576  */
2577 static void
2578 mesh_peer_connect_callback (void *cls,
2579                             const struct GNUNET_PeerIdentity *peer,
2580                             const struct GNUNET_ATS_Information * atsi)
2581 {
2582   struct GNUNET_STREAM_Socket *socket = cls;
2583   struct GNUNET_STREAM_MessageHeader *message;
2584   
2585   if (0 != memcmp (peer,
2586                    &socket->other_peer,
2587                    sizeof (struct GNUNET_PeerIdentity)))
2588   {
2589     LOG (GNUNET_ERROR_TYPE_DEBUG,
2590          "%s: A peer which is not our target has connected to our tunnel\n",
2591          GNUNET_i2s(peer));
2592     return;
2593   }
2594   
2595   LOG (GNUNET_ERROR_TYPE_DEBUG,
2596        "%s: Target peer %s connected\n",
2597        GNUNET_i2s (&socket->other_peer),
2598        GNUNET_i2s (&socket->other_peer));
2599   
2600   /* Set state to INIT */
2601   socket->state = STATE_INIT;
2602
2603   /* Send HELLO message */
2604   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2605   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2606   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2607   queue_message (socket,
2608                  message,
2609                  &set_state_hello_wait,
2610                  NULL);
2611
2612   /* Call open callback */
2613   if (NULL == socket->open_cb)
2614   {
2615     LOG (GNUNET_ERROR_TYPE_DEBUG,
2616          "STREAM_open callback is NULL\n");
2617   }
2618 }
2619
2620
2621 /**
2622  * Function called when our target peer is disconnected from our tunnel
2623  *
2624  * @param cls the socket associated which this tunnel
2625  * @param peer the peer identity of the target
2626  */
2627 static void
2628 mesh_peer_disconnect_callback (void *cls,
2629                                const struct GNUNET_PeerIdentity *peer)
2630 {
2631   struct GNUNET_STREAM_Socket *socket=cls;
2632   
2633   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2634   LOG (GNUNET_ERROR_TYPE_DEBUG,
2635        "%s: Other peer %s disconnected \n",
2636        GNUNET_i2s (&socket->other_peer),
2637        GNUNET_i2s (&socket->other_peer));
2638 }
2639
2640
2641 /**
2642  * Method called whenever a peer creates a tunnel to us
2643  *
2644  * @param cls closure
2645  * @param tunnel new handle to the tunnel
2646  * @param initiator peer that started the tunnel
2647  * @param atsi performance information for the tunnel
2648  * @return initial tunnel context for the tunnel
2649  *         (can be NULL -- that's not an error)
2650  */
2651 static void *
2652 new_tunnel_notify (void *cls,
2653                    struct GNUNET_MESH_Tunnel *tunnel,
2654                    const struct GNUNET_PeerIdentity *initiator,
2655                    const struct GNUNET_ATS_Information *atsi)
2656 {
2657   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2658   struct GNUNET_STREAM_Socket *socket;
2659
2660   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2661      from the same peer again until the socket is closed */
2662
2663   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2664   socket->other_peer = *initiator;
2665   socket->tunnel = tunnel;
2666   socket->session_id = 0;       /* FIXME */
2667   socket->state = STATE_INIT;
2668   socket->lsocket = lsocket;
2669   
2670   LOG (GNUNET_ERROR_TYPE_DEBUG,
2671        "%s: Peer %s initiated tunnel to us\n", 
2672        GNUNET_i2s (&socket->other_peer),
2673        GNUNET_i2s (&socket->other_peer));
2674   
2675   /* FIXME: Copy MESH handle from lsocket to socket */
2676   
2677   return socket;
2678 }
2679
2680
2681 /**
2682  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2683  * any associated state.  This function is NOT called if the client has
2684  * explicitly asked for the tunnel to be destroyed using
2685  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2686  * the tunnel.
2687  *
2688  * @param cls closure (set from GNUNET_MESH_connect)
2689  * @param tunnel connection to the other end (henceforth invalid)
2690  * @param tunnel_ctx place where local state associated
2691  *                   with the tunnel is stored
2692  */
2693 static void 
2694 tunnel_cleaner (void *cls,
2695                 const struct GNUNET_MESH_Tunnel *tunnel,
2696                 void *tunnel_ctx)
2697 {
2698   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2699
2700   if (tunnel != socket->tunnel)
2701     return;
2702
2703   GNUNET_break_op(0);
2704   LOG (GNUNET_ERROR_TYPE_DEBUG,
2705        "%s: Peer %s has terminated connection abruptly\n",
2706        GNUNET_i2s (&socket->other_peer),
2707        GNUNET_i2s (&socket->other_peer));
2708
2709   socket->status = GNUNET_STREAM_SHUTDOWN;
2710
2711   /* Clear Transmit handles */
2712   if (NULL != socket->transmit_handle)
2713   {
2714     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2715     socket->transmit_handle = NULL;
2716   }
2717   if (NULL != socket->ack_transmit_handle)
2718   {
2719     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2720     GNUNET_free (socket->ack_msg);
2721     socket->ack_msg = NULL;
2722     socket->ack_transmit_handle = NULL;
2723   }
2724   /* Stop Tasks using socket->tunnel */
2725   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2726   {
2727     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2728     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2729   }
2730   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2731   {
2732     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2733     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2734   }
2735   /* FIXME: Cancel all other tasks using socket->tunnel */
2736   socket->tunnel = NULL;
2737 }
2738
2739
2740 /*****************/
2741 /* API functions */
2742 /*****************/
2743
2744
2745 /**
2746  * Tries to open a stream to the target peer
2747  *
2748  * @param cfg configuration to use
2749  * @param target the target peer to which the stream has to be opened
2750  * @param app_port the application port number which uniquely identifies this
2751  *            stream
2752  * @param open_cb this function will be called after stream has be established 
2753  * @param open_cb_cls the closure for open_cb
2754  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2755  * @return if successful it returns the stream socket; NULL if stream cannot be
2756  *         opened 
2757  */
2758 struct GNUNET_STREAM_Socket *
2759 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2760                     const struct GNUNET_PeerIdentity *target,
2761                     GNUNET_MESH_ApplicationType app_port,
2762                     GNUNET_STREAM_OpenCallback open_cb,
2763                     void *open_cb_cls,
2764                     ...)
2765 {
2766   struct GNUNET_STREAM_Socket *socket;
2767   enum GNUNET_STREAM_Option option;
2768   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2769   va_list vargs;                /* Variable arguments */
2770
2771   LOG (GNUNET_ERROR_TYPE_DEBUG,
2772        "%s\n", __func__);
2773   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2774   socket->other_peer = *target;
2775   socket->open_cb = open_cb;
2776   socket->open_cls = open_cb_cls;
2777   /* Set defaults */
2778   socket->retransmit_timeout = 
2779     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2780   va_start (vargs, open_cb_cls); /* Parse variable args */
2781   do {
2782     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2783     switch (option)
2784     {
2785     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2786       /* Expect struct GNUNET_TIME_Relative */
2787       socket->retransmit_timeout = va_arg (vargs,
2788                                            struct GNUNET_TIME_Relative);
2789       break;
2790     case GNUNET_STREAM_OPTION_END:
2791       break;
2792     }
2793   } while (GNUNET_STREAM_OPTION_END != option);
2794   va_end (vargs);               /* End of variable args parsing */
2795   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2796                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
2797                                       socket, /* cls */
2798                                       NULL, /* No inbound tunnel handler */
2799                                       NULL, /* No in-tunnel cleaner */
2800                                       client_message_handlers,
2801                                       ports); /* We don't get inbound tunnels */
2802   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2803   {
2804     GNUNET_free (socket);
2805     return NULL;
2806   }
2807
2808   /* Now create the mesh tunnel to target */
2809   LOG (GNUNET_ERROR_TYPE_DEBUG,
2810        "Creating MESH Tunnel\n");
2811   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2812                                               NULL, /* Tunnel context */
2813                                               &mesh_peer_connect_callback,
2814                                               &mesh_peer_disconnect_callback,
2815                                               socket);
2816   GNUNET_assert (NULL != socket->tunnel);
2817   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2818                                         &socket->other_peer);
2819   
2820   LOG (GNUNET_ERROR_TYPE_DEBUG,
2821        "%s() END\n", __func__);
2822   return socket;
2823 }
2824
2825
2826 /**
2827  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2828  *
2829  * @param socket the stream socket
2830  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2831  * @param completion_cb the callback that will be called upon successful
2832  *          shutdown of given operation
2833  * @param completion_cls the closure for the completion callback
2834  * @return the shutdown handle
2835  */
2836 struct GNUNET_STREAM_ShutdownHandle *
2837 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2838                         int operation,
2839                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2840                         void *completion_cls)
2841 {
2842   struct GNUNET_STREAM_ShutdownHandle *handle;
2843   struct GNUNET_STREAM_MessageHeader *msg;
2844   
2845   GNUNET_assert (NULL == socket->shutdown_handle);
2846
2847   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2848   handle->socket = socket;
2849   handle->completion_cb = completion_cb;
2850   handle->completion_cls = completion_cls;
2851   socket->shutdown_handle = handle;
2852
2853   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2854   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2855   switch (operation)
2856   {
2857   case SHUT_RD:
2858     handle->operation = SHUT_RD;
2859     if (NULL != socket->read_handle)
2860       LOG (GNUNET_ERROR_TYPE_WARNING,
2861            "Existing read handle should be cancelled before shutting"
2862            " down reading\n");
2863     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2864     queue_message (socket,
2865                    msg,
2866                    &set_state_receive_close_wait,
2867                    NULL);
2868     break;
2869   case SHUT_WR:
2870     handle->operation = SHUT_WR;
2871     if (NULL != socket->write_handle)
2872       LOG (GNUNET_ERROR_TYPE_WARNING,
2873            "Existing write handle should be cancelled before shutting"
2874            " down writing\n");
2875     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2876     queue_message (socket,
2877                    msg,
2878                    &set_state_transmit_close_wait,
2879                    NULL);
2880     break;
2881   case SHUT_RDWR:
2882     handle->operation = SHUT_RDWR;
2883     if (NULL != socket->write_handle)
2884       LOG (GNUNET_ERROR_TYPE_WARNING,
2885            "Existing write handle should be cancelled before shutting"
2886            " down writing\n");
2887     if (NULL != socket->read_handle)
2888       LOG (GNUNET_ERROR_TYPE_WARNING,
2889            "Existing read handle should be cancelled before shutting"
2890            " down reading\n");
2891     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2892     queue_message (socket,
2893                    msg,
2894                    &set_state_close_wait,
2895                    NULL);
2896     break;
2897   default:
2898     LOG (GNUNET_ERROR_TYPE_WARNING,
2899          "GNUNET_STREAM_shutdown called with invalid value for "
2900          "parameter operation -- Ignoring\n");
2901     GNUNET_free (msg);
2902     GNUNET_free (handle);
2903     return NULL;
2904   }
2905   handle->close_msg_retransmission_task_id =
2906     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2907                                   &close_msg_retransmission_task,
2908                                   handle);
2909   return handle;
2910 }
2911
2912
2913 /**
2914  * Cancels a pending shutdown
2915  *
2916  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2917  */
2918 void
2919 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2920 {
2921   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2922     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2923   GNUNET_free (handle);
2924   return;
2925 }
2926
2927
2928 /**
2929  * Closes the stream
2930  *
2931  * @param socket the stream socket
2932  */
2933 void
2934 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2935 {
2936   struct MessageQueue *head;
2937
2938   if (NULL != socket->read_handle)
2939   {
2940     LOG (GNUNET_ERROR_TYPE_WARNING,
2941          "Closing STREAM socket when a read handle is pending\n");
2942   }
2943   if (NULL != socket->write_handle)
2944   {
2945     LOG (GNUNET_ERROR_TYPE_WARNING,
2946          "Closing STREAM socket when a write handle is pending\n");
2947     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2948     //socket->write_handle = NULL;
2949   }
2950
2951   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2952   {
2953     /* socket closed with read task pending!? */
2954     GNUNET_break (0);
2955     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2956     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2957   }
2958   
2959   /* Terminate the ack'ing tasks if they are still present */
2960   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2961   {
2962     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2963     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2964   }
2965
2966   /* Clear Transmit handles */
2967   if (NULL != socket->transmit_handle)
2968   {
2969     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2970     socket->transmit_handle = NULL;
2971   }
2972   if (NULL != socket->ack_transmit_handle)
2973   {
2974     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2975     GNUNET_free (socket->ack_msg);
2976     socket->ack_msg = NULL;
2977     socket->ack_transmit_handle = NULL;
2978   }
2979
2980   /* Clear existing message queue */
2981   while (NULL != (head = socket->queue_head)) {
2982     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2983                                  socket->queue_tail,
2984                                  head);
2985     GNUNET_free (head->message);
2986     GNUNET_free (head);
2987   }
2988
2989   /* Close associated tunnel */
2990   if (NULL != socket->tunnel)
2991   {
2992     GNUNET_MESH_tunnel_destroy (socket->tunnel);
2993     socket->tunnel = NULL;
2994   }
2995
2996   /* Close mesh connection */
2997   if (NULL != socket->mesh && NULL == socket->lsocket)
2998   {
2999     GNUNET_MESH_disconnect (socket->mesh);
3000     socket->mesh = NULL;
3001   }
3002   
3003   /* Release receive buffer */
3004   if (NULL != socket->receive_buffer)
3005   {
3006     GNUNET_free (socket->receive_buffer);
3007   }
3008
3009   GNUNET_free (socket);
3010 }
3011
3012
3013 /**
3014  * Listens for stream connections for a specific application ports
3015  *
3016  * @param cfg the configuration to use
3017  * @param app_port the application port for which new streams will be accepted
3018  * @param listen_cb this function will be called when a peer tries to establish
3019  *            a stream with us
3020  * @param listen_cb_cls closure for listen_cb
3021  * @return listen socket, NULL for any error
3022  */
3023 struct GNUNET_STREAM_ListenSocket *
3024 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3025                       GNUNET_MESH_ApplicationType app_port,
3026                       GNUNET_STREAM_ListenCallback listen_cb,
3027                       void *listen_cb_cls)
3028 {
3029   /* FIXME: Add variable args for passing configration options? */
3030   struct GNUNET_STREAM_ListenSocket *lsocket;
3031   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3032
3033   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3034   lsocket->port = app_port;
3035   lsocket->listen_cb = listen_cb;
3036   lsocket->listen_cb_cls = listen_cb_cls;
3037   lsocket->mesh = GNUNET_MESH_connect (cfg,
3038                                        RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
3039                                        lsocket, /* Closure */
3040                                        &new_tunnel_notify,
3041                                        &tunnel_cleaner,
3042                                        server_message_handlers,
3043                                        ports);
3044   GNUNET_assert (NULL != lsocket->mesh);
3045   return lsocket;
3046 }
3047
3048
3049 /**
3050  * Closes the listen socket
3051  *
3052  * @param lsocket the listen socket
3053  */
3054 void
3055 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3056 {
3057   /* Close MESH connection */
3058   GNUNET_assert (NULL != lsocket->mesh);
3059   GNUNET_MESH_disconnect (lsocket->mesh);
3060   
3061   GNUNET_free (lsocket);
3062 }
3063
3064
3065 /**
3066  * Tries to write the given data to the stream. The maximum size of data that
3067  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3068  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3069  * violation, however only the said number of maximum bytes will be written.
3070  *
3071  * @param socket the socket representing a stream
3072  * @param data the data buffer from where the data is written into the stream
3073  * @param size the number of bytes to be written from the data buffer
3074  * @param timeout the timeout period
3075  * @param write_cont the function to call upon writing some bytes into the
3076  *          stream 
3077  * @param write_cont_cls the closure
3078  *
3079  * @return handle to cancel the operation; if a previous write is pending or
3080  *           the stream has been shutdown for this operation then write_cont is
3081  *           immediately called and NULL is returned.
3082  */
3083 struct GNUNET_STREAM_IOWriteHandle *
3084 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3085                      const void *data,
3086                      size_t size,
3087                      struct GNUNET_TIME_Relative timeout,
3088                      GNUNET_STREAM_CompletionContinuation write_cont,
3089                      void *write_cont_cls)
3090 {
3091   unsigned int num_needed_packets;
3092   unsigned int packet;
3093   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3094   uint32_t packet_size;
3095   uint32_t payload_size;
3096   struct GNUNET_STREAM_DataMessage *data_msg;
3097   const void *sweep;
3098   struct GNUNET_TIME_Relative ack_deadline;
3099
3100   LOG (GNUNET_ERROR_TYPE_DEBUG,
3101        "%s\n", __func__);
3102
3103   /* Return NULL if there is already a write request pending */
3104   if (NULL != socket->write_handle)
3105   {
3106     GNUNET_break (0);
3107     return NULL;
3108   }
3109
3110   switch (socket->state)
3111   {
3112   case STATE_TRANSMIT_CLOSED:
3113   case STATE_TRANSMIT_CLOSE_WAIT:
3114   case STATE_CLOSED:
3115   case STATE_CLOSE_WAIT:
3116     if (NULL != write_cont)
3117       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3118     LOG (GNUNET_ERROR_TYPE_DEBUG,
3119          "%s() END\n", __func__);
3120     return NULL;
3121   case STATE_INIT:
3122   case STATE_LISTEN:
3123   case STATE_HELLO_WAIT:
3124     if (NULL != write_cont)
3125       /* FIXME: GNUNET_STREAM_SYSERR?? */
3126       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3127     LOG (GNUNET_ERROR_TYPE_DEBUG,
3128          "%s() END\n", __func__);
3129     return NULL;
3130   case STATE_ESTABLISHED:
3131   case STATE_RECEIVE_CLOSED:
3132   case STATE_RECEIVE_CLOSE_WAIT:
3133     break;
3134   }
3135
3136   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3137     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3138   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3139   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3140   io_handle->socket = socket;
3141   io_handle->write_cont = write_cont;
3142   io_handle->write_cont_cls = write_cont_cls;
3143   io_handle->size = size;
3144   sweep = data;
3145   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3146      determined from RTT */
3147   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3148   /* Divide the given buffer into packets for sending */
3149   for (packet=0; packet < num_needed_packets; packet++)
3150   {
3151     if ((packet + 1) * max_payload_size < size) 
3152     {
3153       payload_size = max_payload_size;
3154       packet_size = MAX_PACKET_SIZE;
3155     }
3156     else 
3157     {
3158       payload_size = size - packet * max_payload_size;
3159       packet_size =  payload_size + sizeof (struct
3160                                             GNUNET_STREAM_DataMessage); 
3161     }
3162     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3163     io_handle->messages[packet]->header.header.size = htons (packet_size);
3164     io_handle->messages[packet]->header.header.type =
3165       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3166     io_handle->messages[packet]->sequence_number =
3167       htonl (socket->write_sequence_number++);
3168     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3169
3170     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3171        determined from RTT */
3172     io_handle->messages[packet]->ack_deadline =
3173       GNUNET_TIME_relative_hton (ack_deadline);
3174     data_msg = io_handle->messages[packet];
3175     /* Copy data from given buffer to the packet */
3176     memcpy (&data_msg[1],
3177             sweep,
3178             payload_size);
3179     sweep += payload_size;
3180     socket->write_offset += payload_size;
3181   }
3182   socket->write_handle = io_handle;
3183   write_data (socket);
3184
3185   LOG (GNUNET_ERROR_TYPE_DEBUG,
3186        "%s() END\n", __func__);
3187
3188   return io_handle;
3189 }
3190
3191
3192
3193 /**
3194  * Tries to read data from the stream.
3195  *
3196  * @param socket the socket representing a stream
3197  * @param timeout the timeout period
3198  * @param proc function to call with data (once only)
3199  * @param proc_cls the closure for proc
3200  *
3201  * @return handle to cancel the operation; if the stream has been shutdown for
3202  *           this type of opeartion then the DataProcessor is immediately
3203  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3204  */
3205 struct GNUNET_STREAM_IOReadHandle *
3206 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3207                     struct GNUNET_TIME_Relative timeout,
3208                     GNUNET_STREAM_DataProcessor proc,
3209                     void *proc_cls)
3210 {
3211   struct GNUNET_STREAM_IOReadHandle *read_handle;
3212   
3213   LOG (GNUNET_ERROR_TYPE_DEBUG,
3214        "%s: %s()\n", 
3215        GNUNET_i2s (&socket->other_peer),
3216        __func__);
3217
3218   /* Return NULL if there is already a read handle; the user has to cancel that
3219      first before continuing or has to wait until it is completed */
3220   if (NULL != socket->read_handle) return NULL;
3221
3222   GNUNET_assert (NULL != proc);
3223
3224   switch (socket->state)
3225   {
3226   case STATE_RECEIVE_CLOSED:
3227   case STATE_RECEIVE_CLOSE_WAIT:
3228   case STATE_CLOSED:
3229   case STATE_CLOSE_WAIT:
3230     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3231     LOG (GNUNET_ERROR_TYPE_DEBUG,
3232          "%s: %s() END\n",
3233          GNUNET_i2s (&socket->other_peer),
3234          __func__);
3235     return NULL;
3236   default:
3237     break;
3238   }
3239
3240   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3241   read_handle->proc = proc;
3242   read_handle->proc_cls = proc_cls;
3243   socket->read_handle = read_handle;
3244
3245   /* Check if we have a packet at bitmap 0 */
3246   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3247                                           0))
3248   {
3249     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3250                                                      socket);
3251    
3252   }
3253   
3254   /* Setup the read timeout task */
3255   socket->read_io_timeout_task_id =
3256     GNUNET_SCHEDULER_add_delayed (timeout,
3257                                   &read_io_timeout,
3258                                   socket);
3259   LOG (GNUNET_ERROR_TYPE_DEBUG,
3260        "%s: %s() END\n",
3261        GNUNET_i2s (&socket->other_peer),
3262        __func__);
3263   return read_handle;
3264 }
3265
3266
3267 /**
3268  * Cancel pending write operation.
3269  *
3270  * @param ioh handle to operation to cancel
3271  */
3272 void
3273 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3274 {
3275   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3276   unsigned int packet;
3277
3278   GNUNET_assert (NULL != socket->write_handle);
3279   GNUNET_assert (socket->write_handle == ioh);
3280
3281   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3282   {
3283     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3284     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3285   }
3286
3287   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3288   {
3289     if (NULL == ioh->messages[packet]) break;
3290     GNUNET_free (ioh->messages[packet]);
3291   }
3292       
3293   GNUNET_free (socket->write_handle);
3294   socket->write_handle = NULL;
3295   return;
3296 }
3297
3298
3299 /**
3300  * Cancel pending read operation.
3301  *
3302  * @param ioh handle to operation to cancel
3303  */
3304 void
3305 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3306 {
3307   return;
3308 }