logging and indentation
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  *
29  * Add code for write io timeout
30  *
31  * Include retransmission for control messages
32  **/
33
34 /**
35  * @file stream/stream_api.c
36  * @brief Implementation of the stream library
37  * @author Sree Harsha Totakura
38  */
39
40
41 #include "platform.h"
42 #include "gnunet_common.h"
43 #include "gnunet_crypto_lib.h"
44 #include "gnunet_stream_lib.h"
45 #include "gnunet_testing_lib.h"
46 #include "stream_protocol.h"
47
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * The maximum packet size of a stream packet
53  */
54 #define MAX_PACKET_SIZE 64000
55
56 /**
57  * Receive buffer
58  */
59 #define RECEIVE_BUFFER_SIZE 4096000
60
61 /**
62  * The maximum payload a data message packet can carry
63  */
64 static size_t max_payload_size = 
65   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
66
67 /**
68  * states in the Protocol
69  */
70 enum State
71   {
72     /**
73      * Client initialization state
74      */
75     STATE_INIT,
76
77     /**
78      * Listener initialization state 
79      */
80     STATE_LISTEN,
81
82     /**
83      * Pre-connection establishment state
84      */
85     STATE_HELLO_WAIT,
86
87     /**
88      * State where a connection has been established
89      */
90     STATE_ESTABLISHED,
91
92     /**
93      * State where the socket is closed on our side and waiting to be ACK'ed
94      */
95     STATE_RECEIVE_CLOSE_WAIT,
96
97     /**
98      * State where the socket is closed for reading
99      */
100     STATE_RECEIVE_CLOSED,
101
102     /**
103      * State where the socket is closed on our side and waiting to be ACK'ed
104      */
105     STATE_TRANSMIT_CLOSE_WAIT,
106
107     /**
108      * State where the socket is closed for writing
109      */
110     STATE_TRANSMIT_CLOSED,
111
112     /**
113      * State where the socket is closed on our side and waiting to be ACK'ed
114      */
115     STATE_CLOSE_WAIT,
116
117     /**
118      * State where the socket is closed
119      */
120     STATE_CLOSED 
121   };
122
123
124 /**
125  * Functions of this type are called when a message is written
126  *
127  * @param cls the closure from queue_message
128  * @param socket the socket the written message was bound to
129  */
130 typedef void (*SendFinishCallback) (void *cls,
131                                     struct GNUNET_STREAM_Socket *socket);
132
133
134 /**
135  * The send message queue
136  */
137 struct MessageQueue
138 {
139   /**
140    * The message
141    */
142   struct GNUNET_STREAM_MessageHeader *message;
143
144   /**
145    * Callback to be called when the message is sent
146    */
147   SendFinishCallback finish_cb;
148
149   /**
150    * The closure for finish_cb
151    */
152   void *finish_cb_cls;
153
154   /**
155    * The next message in queue. Should be NULL in the last message
156    */
157   struct MessageQueue *next;
158
159   /**
160    * The next message in queue. Should be NULL in the first message
161    */
162   struct MessageQueue *prev;
163 };
164
165
166 /**
167  * The STREAM Socket Handler
168  */
169 struct GNUNET_STREAM_Socket
170 {
171   /**
172    * Retransmission timeout
173    */
174   struct GNUNET_TIME_Relative retransmit_timeout;
175
176   /**
177    * The Acknowledgement Bitmap
178    */
179   GNUNET_STREAM_AckBitmap ack_bitmap;
180
181   /**
182    * Time when the Acknowledgement was queued
183    */
184   struct GNUNET_TIME_Absolute ack_time_registered;
185
186   /**
187    * Queued Acknowledgement deadline
188    */
189   struct GNUNET_TIME_Relative ack_time_deadline;
190
191   /**
192    * The mesh handle
193    */
194   struct GNUNET_MESH_Handle *mesh;
195
196   /**
197    * The mesh tunnel handle
198    */
199   struct GNUNET_MESH_Tunnel *tunnel;
200
201   /**
202    * Stream open closure
203    */
204   void *open_cls;
205
206   /**
207    * Stream open callback
208    */
209   GNUNET_STREAM_OpenCallback open_cb;
210
211   /**
212    * The current transmit handle (if a pending transmit request exists)
213    */
214   struct GNUNET_MESH_TransmitHandle *transmit_handle;
215
216   /**
217    * The current act transmit handle (if a pending ack transmit request exists)
218    */
219   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
220
221   /**
222    * Pointer to the current ack message using in ack_task
223    */
224   struct GNUNET_STREAM_AckMessage *ack_msg;
225
226   /**
227    * The current message associated with the transmit handle
228    */
229   struct MessageQueue *queue_head;
230
231   /**
232    * The queue tail, should always point to the last message in queue
233    */
234   struct MessageQueue *queue_tail;
235
236   /**
237    * The write IO_handle associated with this socket
238    */
239   struct GNUNET_STREAM_IOWriteHandle *write_handle;
240
241   /**
242    * The read IO_handle associated with this socket
243    */
244   struct GNUNET_STREAM_IOReadHandle *read_handle;
245
246   /**
247    * The shutdown handle associated with this socket
248    */
249   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
250
251   /**
252    * Buffer for storing received messages
253    */
254   void *receive_buffer;
255
256   /**
257    * The listen socket from which this socket is derived. Should be NULL if it
258    * is not a derived socket
259    */
260   struct GNUNET_STREAM_ListenSocket *lsocket;
261
262   /**
263    * Task identifier for the read io timeout task
264    */
265   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
266
267   /**
268    * Task identifier for retransmission task after timeout
269    */
270   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
271
272   /**
273    * The task for sending timely Acks
274    */
275   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
276
277   /**
278    * Task scheduled to continue a read operation.
279    */
280   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
281
282   /**
283    * The state of the protocol associated with this socket
284    */
285   enum State state;
286
287   /**
288    * The status of the socket
289    */
290   enum GNUNET_STREAM_Status status;
291
292   /**
293    * The number of previous timeouts; FIXME: currently not used
294    */
295   unsigned int retries;
296
297   /**
298    * The peer identity of the peer at the other end of the stream
299    */
300   GNUNET_PEER_Id other_peer;
301
302   /**
303    * The application port number (type: uint32_t)
304    */
305   GNUNET_MESH_ApplicationType app_port;
306
307   /**
308    * The session id associated with this stream connection
309    * FIXME: Not used currently, may be removed
310    */
311   uint32_t session_id;
312
313   /**
314    * Write sequence number. Set to random when sending HELLO(client) and
315    * HELLO_ACK(server) 
316    */
317   uint32_t write_sequence_number;
318
319   /**
320    * Read sequence number. This number's value is determined during handshake
321    */
322   uint32_t read_sequence_number;
323
324   /**
325    * The receiver buffer size
326    */
327   uint32_t receive_buffer_size;
328
329   /**
330    * The receiver buffer boundaries
331    */
332   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
333
334   /**
335    * receiver's available buffer after the last acknowledged packet
336    */
337   uint32_t receiver_window_available;
338
339   /**
340    * The offset pointer used during write operation
341    */
342   uint32_t write_offset;
343
344   /**
345    * The offset after which we are expecting data
346    */
347   uint32_t read_offset;
348
349   /**
350    * The offset upto which user has read from the received buffer
351    */
352   uint32_t copy_offset;
353 };
354
355
356 /**
357  * A socket for listening
358  */
359 struct GNUNET_STREAM_ListenSocket
360 {
361   /**
362    * The mesh handle
363    */
364   struct GNUNET_MESH_Handle *mesh;
365
366   /**
367    * The callback function which is called after successful opening socket
368    */
369   GNUNET_STREAM_ListenCallback listen_cb;
370
371   /**
372    * The call back closure
373    */
374   void *listen_cb_cls;
375
376   /**
377    * The service port
378    * FIXME: Remove if not required!
379    */
380   GNUNET_MESH_ApplicationType port;
381 };
382
383
384 /**
385  * The IO Write Handle
386  */
387 struct GNUNET_STREAM_IOWriteHandle
388 {
389   /**
390    * The socket to which this write handle is associated
391    */
392   struct GNUNET_STREAM_Socket *socket;
393
394   /**
395    * The packet_buffers associated with this Handle
396    */
397   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
398
399   /**
400    * The write continuation callback
401    */
402   GNUNET_STREAM_CompletionContinuation write_cont;
403
404   /**
405    * Write continuation closure
406    */
407   void *write_cont_cls;
408
409   /**
410    * The bitmap of this IOHandle; Corresponding bit for a message is set when
411    * it has been acknowledged by the receiver
412    */
413   GNUNET_STREAM_AckBitmap ack_bitmap;
414
415   /**
416    * Number of bytes in this write handle
417    */
418   size_t size;
419 };
420
421
422 /**
423  * The IO Read Handle
424  */
425 struct GNUNET_STREAM_IOReadHandle
426 {
427   /**
428    * Callback for the read processor
429    */
430   GNUNET_STREAM_DataProcessor proc;
431
432   /**
433    * The closure pointer for the read processor callback
434    */
435   void *proc_cls;
436 };
437
438
439 /**
440  * Handle for Shutdown
441  */
442 struct GNUNET_STREAM_ShutdownHandle
443 {
444   /**
445    * The socket associated with this shutdown handle
446    */
447   struct GNUNET_STREAM_Socket *socket;
448
449   /**
450    * Shutdown completion callback
451    */
452   GNUNET_STREAM_ShutdownCompletion completion_cb;
453
454   /**
455    * Closure for completion callback
456    */
457   void *completion_cls;
458
459   /**
460    * Close message retransmission task id
461    */
462   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
463
464   /**
465    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
466    */
467   int operation;  
468 };
469
470
471 /**
472  * Default value in seconds for various timeouts
473  */
474 static unsigned int default_timeout = 10;
475
476
477 /**
478  * Callback function for sending queued message
479  *
480  * @param cls closure the socket
481  * @param size number of bytes available in buf
482  * @param buf where the callee should write the message
483  * @return number of bytes written to buf
484  */
485 static size_t
486 send_message_notify (void *cls, size_t size, void *buf)
487 {
488   struct GNUNET_STREAM_Socket *socket = cls;
489   struct GNUNET_PeerIdentity target;
490   struct MessageQueue *head;
491   size_t ret;
492
493   socket->transmit_handle = NULL; /* Remove the transmit handle */
494   head = socket->queue_head;
495   if (NULL == head)
496     return 0; /* just to be safe */
497   GNUNET_PEER_resolve (socket->other_peer, &target);
498   if (0 == size)                /* request timed out */
499   {
500     socket->retries++;
501     LOG (GNUNET_ERROR_TYPE_DEBUG,
502          "Message sending timed out. Retry %d \n",
503          socket->retries);
504     socket->transmit_handle = 
505       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
506                                          0, /* Corking */
507                                          1, /* Priority */
508                                          /* FIXME: exponential backoff */
509                                          socket->retransmit_timeout,
510                                          &target,
511                                          ntohs (head->message->header.size),
512                                          &send_message_notify,
513                                          socket);
514     return 0;
515   }
516
517   ret = ntohs (head->message->header.size);
518   GNUNET_assert (size >= ret);
519   memcpy (buf, head->message, ret);
520   if (NULL != head->finish_cb)
521   {
522     head->finish_cb (head->finish_cb_cls, socket);
523   }
524   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
525                                socket->queue_tail,
526                                head);
527   GNUNET_free (head->message);
528   GNUNET_free (head);
529   head = socket->queue_head;
530   if (NULL != head)    /* more pending messages to send */
531   {
532     socket->retries = 0;
533     socket->transmit_handle = 
534       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
535                                          0, /* Corking */
536                                          1, /* Priority */
537                                          /* FIXME: exponential backoff */
538                                          socket->retransmit_timeout,
539                                          &target,
540                                          ntohs (head->message->header.size),
541                                          &send_message_notify,
542                                          socket);
543   }
544   return ret;
545 }
546
547
548 /**
549  * Queues a message for sending using the mesh connection of a socket
550  *
551  * @param socket the socket whose mesh connection is used
552  * @param message the message to be sent
553  * @param finish_cb the callback to be called when the message is sent
554  * @param finish_cb_cls the closure for the callback
555  */
556 static void
557 queue_message (struct GNUNET_STREAM_Socket *socket,
558                struct GNUNET_STREAM_MessageHeader *message,
559                SendFinishCallback finish_cb,
560                void *finish_cb_cls)
561 {
562   struct MessageQueue *queue_entity;
563   struct GNUNET_PeerIdentity target;
564
565   GNUNET_assert 
566     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
567      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
568
569   LOG (GNUNET_ERROR_TYPE_DEBUG,
570        "Queueing message of type %d and size %d\n",
571        ntohs (message->header.type),
572        ntohs (message->header.size));
573   GNUNET_assert (NULL != message);
574   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
575   queue_entity->message = message;
576   queue_entity->finish_cb = finish_cb;
577   queue_entity->finish_cb_cls = finish_cb_cls;
578   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
579                                     socket->queue_tail,
580                                     queue_entity);
581   if (NULL == socket->transmit_handle)
582   {
583     socket->retries = 0;
584     GNUNET_PEER_resolve (socket->other_peer, &target);
585     socket->transmit_handle = 
586       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
587                                          0, /* Corking */
588                                          1, /* Priority */
589                                          socket->retransmit_timeout,
590                                          &target,
591                                          ntohs (message->header.size),
592                                          &send_message_notify,
593                                          socket);
594   }
595 }
596
597
598 /**
599  * Copies a message and queues it for sending using the mesh connection of
600  * given socket 
601  *
602  * @param socket the socket whose mesh connection is used
603  * @param message the message to be sent
604  * @param finish_cb the callback to be called when the message is sent
605  * @param finish_cb_cls the closure for the callback
606  */
607 static void
608 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
609                         const struct GNUNET_STREAM_MessageHeader *message,
610                         SendFinishCallback finish_cb,
611                         void *finish_cb_cls)
612 {
613   struct GNUNET_STREAM_MessageHeader *msg_copy;
614   uint16_t size;
615   
616   size = ntohs (message->header.size);
617   msg_copy = GNUNET_malloc (size);
618   memcpy (msg_copy, message, size);
619   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
620 }
621
622
623 /**
624  * Callback function for sending ack message
625  *
626  * @param cls closure the ACK message created in ack_task
627  * @param size number of bytes available in buffer
628  * @param buf where the callee should write the message
629  * @return number of bytes written to buf
630  */
631 static size_t
632 send_ack_notify (void *cls, size_t size, void *buf)
633 {
634   struct GNUNET_STREAM_Socket *socket = cls;
635
636   if (0 == size)
637   {
638     LOG (GNUNET_ERROR_TYPE_DEBUG,
639          "%s called with size 0\n", __func__);
640     return 0;
641   }
642   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
643   
644   size = ntohs (socket->ack_msg->header.header.size);
645   memcpy (buf, socket->ack_msg, size);
646   
647   GNUNET_free (socket->ack_msg);
648   socket->ack_msg = NULL;
649   socket->ack_transmit_handle = NULL;
650   return size;
651 }
652
653 /**
654  * Writes data using the given socket. The amount of data written is limited by
655  * the receiver_window_size
656  *
657  * @param socket the socket to use
658  */
659 static void 
660 write_data (struct GNUNET_STREAM_Socket *socket);
661
662 /**
663  * Task for retransmitting data messages if they aren't ACK before their ack
664  * deadline 
665  *
666  * @param cls the socket
667  * @param tc the Task context
668  */
669 static void
670 retransmission_timeout_task (void *cls,
671                              const struct GNUNET_SCHEDULER_TaskContext *tc)
672 {
673   struct GNUNET_STREAM_Socket *socket = cls;
674   
675   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
676     return;
677
678   LOG (GNUNET_ERROR_TYPE_DEBUG,
679        "Retransmitting DATA...\n");
680   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
681   write_data (socket);
682 }
683
684
685 /**
686  * Task for sending ACK message
687  *
688  * @param cls the socket
689  * @param tc the Task context
690  */
691 static void
692 ack_task (void *cls,
693           const struct GNUNET_SCHEDULER_TaskContext *tc)
694 {
695   struct GNUNET_STREAM_Socket *socket = cls;
696   struct GNUNET_STREAM_AckMessage *ack_msg;
697   struct GNUNET_PeerIdentity target;
698
699   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
700   {
701     return;
702   }
703
704   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
705
706   /* Create the ACK Message */
707   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
708   ack_msg->header.header.size = htons (sizeof (struct 
709                                                GNUNET_STREAM_AckMessage));
710   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
711   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
712   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
713   ack_msg->receive_window_remaining = 
714     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
715
716   socket->ack_msg = ack_msg;
717   GNUNET_PEER_resolve (socket->other_peer, &target);
718   /* Request MESH for sending ACK */
719   socket->ack_transmit_handle = 
720     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
721                                        0, /* Corking */
722                                        1, /* Priority */
723                                        socket->retransmit_timeout,
724                                        &target,
725                                        ntohs (ack_msg->header.header.size),
726                                        &send_ack_notify,
727                                        socket);
728 }
729
730
731 /**
732  * Retransmission task for shutdown messages
733  *
734  * @param cls the shutdown handle
735  * @param tc the Task Context
736  */
737 static void
738 close_msg_retransmission_task (void *cls,
739                                const struct GNUNET_SCHEDULER_TaskContext *tc)
740 {
741   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
742   struct GNUNET_STREAM_MessageHeader *msg;
743   struct GNUNET_STREAM_Socket *socket;
744
745   GNUNET_assert (NULL != shutdown_handle);
746   socket = shutdown_handle->socket;
747
748   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
749   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
750   switch (shutdown_handle->operation)
751   {
752   case SHUT_RDWR:
753     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
754     break;
755   case SHUT_RD:
756     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
757     break;
758   case SHUT_WR:
759     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
760     break;
761   default:
762     GNUNET_free (msg);
763     shutdown_handle->close_msg_retransmission_task_id = 
764       GNUNET_SCHEDULER_NO_TASK;
765     return;
766   }
767   queue_message (socket, msg, NULL, NULL);
768   shutdown_handle->close_msg_retransmission_task_id =
769     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
770                                   &close_msg_retransmission_task,
771                                   shutdown_handle);
772 }
773
774
775 /**
776  * Function to modify a bit in GNUNET_STREAM_AckBitmap
777  *
778  * @param bitmap the bitmap to modify
779  * @param bit the bit number to modify
780  * @param value GNUNET_YES to on, GNUNET_NO to off
781  */
782 static void
783 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
784                       unsigned int bit, 
785                       int value)
786 {
787   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
788   if (GNUNET_YES == value)
789     *bitmap |= (1LL << bit);
790   else
791     *bitmap &= ~(1LL << bit);
792 }
793
794
795 /**
796  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
797  *
798  * @param bitmap address of the bitmap that has to be checked
799  * @param bit the bit number to check
800  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
801  */
802 static uint8_t
803 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
804                       unsigned int bit)
805 {
806   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
807   return 0 != (*bitmap & (1LL << bit));
808 }
809
810
811 /**
812  * Writes data using the given socket. The amount of data written is limited by
813  * the receiver_window_size
814  *
815  * @param socket the socket to use
816  */
817 static void 
818 write_data (struct GNUNET_STREAM_Socket *socket)
819 {
820   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
821   int packet;                   /* Although an int, should never be negative */
822   int ack_packet;
823
824   ack_packet = -1;
825   /* Find the last acknowledged packet */
826   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
827   {      
828     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
829                                             packet))
830       ack_packet = packet;        
831     else if (NULL == io_handle->messages[packet])
832       break;
833   }
834   /* Resend packets which weren't ack'ed */
835   for (packet=0; packet < ack_packet; packet++)
836   {
837     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
838                                            packet))
839     {
840       LOG (GNUNET_ERROR_TYPE_DEBUG,
841            "Placing DATA message with sequence %u in send queue\n",
842            ntohl (io_handle->messages[packet]->sequence_number));
843
844       copy_and_queue_message (socket,
845                               &io_handle->messages[packet]->header,
846                               NULL,
847                               NULL);
848     }
849   }
850   packet = ack_packet + 1;
851   /* Now send new packets if there is enough buffer space */
852   while ( (NULL != io_handle->messages[packet]) &&
853           (socket->receiver_window_available 
854            >= ntohs (io_handle->messages[packet]->header.header.size)) )
855   {
856     socket->receiver_window_available -= 
857       ntohs (io_handle->messages[packet]->header.header.size);
858     LOG (GNUNET_ERROR_TYPE_DEBUG,
859          "Placing DATA message with sequence %u in send queue\n",
860          ntohl (io_handle->messages[packet]->sequence_number));
861     copy_and_queue_message (socket,
862                             &io_handle->messages[packet]->header,
863                             NULL,
864                             NULL);
865     packet++;
866   }
867
868   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
869     socket->retransmission_timeout_task_id = 
870       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
871                                     (GNUNET_TIME_UNIT_SECONDS, 8),
872                                     &retransmission_timeout_task,
873                                     socket);
874 }
875
876
877 /**
878  * Task for calling the read processor
879  *
880  * @param cls the socket
881  * @param tc the task context
882  */
883 static void
884 call_read_processor (void *cls,
885                      const struct GNUNET_SCHEDULER_TaskContext *tc)
886 {
887   struct GNUNET_STREAM_Socket *socket = cls;
888   size_t read_size;
889   size_t valid_read_size;
890   unsigned int packet;
891   uint32_t sequence_increase;
892   uint32_t offset_increase;
893
894   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
895   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
896     return;
897
898   if (NULL == socket->receive_buffer) 
899     return;
900
901   GNUNET_assert (NULL != socket->read_handle);
902   GNUNET_assert (NULL != socket->read_handle->proc);
903
904   /* Check the bitmap for any holes */
905   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
906   {
907     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
908                                            packet))
909       break;
910   }
911   /* We only call read processor if we have the first packet */
912   GNUNET_assert (0 < packet);
913
914   valid_read_size = 
915     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
916
917   GNUNET_assert (0 != valid_read_size);
918
919   /* Cancel the read_io_timeout_task */
920   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
921   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
922
923   /* Call the data processor */
924   LOG (GNUNET_ERROR_TYPE_DEBUG,
925        "Calling read processor\n");
926   read_size = 
927     socket->read_handle->proc (socket->read_handle->proc_cls,
928                                socket->status,
929                                socket->receive_buffer + socket->copy_offset,
930                                valid_read_size);
931   LOG (GNUNET_ERROR_TYPE_DEBUG,
932        "Read processor read %d bytes\n",
933        read_size);
934   LOG (GNUNET_ERROR_TYPE_DEBUG,
935        "Read processor completed successfully\n");
936
937   /* Free the read handle */
938   GNUNET_free (socket->read_handle);
939   socket->read_handle = NULL;
940
941   GNUNET_assert (read_size <= valid_read_size);
942   socket->copy_offset += read_size;
943
944   /* Determine upto which packet we can remove from the buffer */
945   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
946   {
947     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
948     { packet++; break; }
949     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
950       break;
951   }
952
953   /* If no packets can be removed we can't move the buffer */
954   if (0 == packet) return;
955
956   sequence_increase = packet;
957   LOG (GNUNET_ERROR_TYPE_DEBUG,
958        "Sequence increase after read processor completion: %u\n",
959        sequence_increase);
960
961   /* Shift the data in the receive buffer */
962   memmove (socket->receive_buffer,
963            socket->receive_buffer 
964            + socket->receive_buffer_boundaries[sequence_increase-1],
965            socket->receive_buffer_size
966            - socket->receive_buffer_boundaries[sequence_increase-1]);
967   
968   /* Shift the bitmap */
969   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
970   
971   /* Set read_sequence_number */
972   socket->read_sequence_number += sequence_increase;
973   
974   /* Set read_offset */
975   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
976   socket->read_offset += offset_increase;
977
978   /* Fix copy_offset */
979   GNUNET_assert (offset_increase <= socket->copy_offset);
980   socket->copy_offset -= offset_increase;
981   
982   /* Fix relative boundaries */
983   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
984   {
985     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
986     {
987       socket->receive_buffer_boundaries[packet] = 
988         socket->receive_buffer_boundaries[packet + sequence_increase] 
989         - offset_increase;
990     }
991     else
992       socket->receive_buffer_boundaries[packet] = 0;
993   }
994 }
995
996
997 /**
998  * Cancels the existing read io handle
999  *
1000  * @param cls the closure from the SCHEDULER call
1001  * @param tc the task context
1002  */
1003 static void
1004 read_io_timeout (void *cls, 
1005                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1006 {
1007   struct GNUNET_STREAM_Socket *socket = cls;
1008   GNUNET_STREAM_DataProcessor proc;
1009   void *proc_cls;
1010
1011   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1012   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1013   {
1014     LOG (GNUNET_ERROR_TYPE_DEBUG,
1015          "Read task timedout - Cancelling it\n");
1016     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1017     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1018   }
1019   GNUNET_assert (NULL != socket->read_handle);
1020   proc = socket->read_handle->proc;
1021   proc_cls = socket->read_handle->proc_cls;
1022
1023   GNUNET_free (socket->read_handle);
1024   socket->read_handle = NULL;
1025   /* Call the read processor to signal timeout */
1026   proc (proc_cls,
1027         GNUNET_STREAM_TIMEOUT,
1028         NULL,
1029         0);
1030 }
1031
1032
1033 /**
1034  * Handler for DATA messages; Same for both client and server
1035  *
1036  * @param socket the socket through which the ack was received
1037  * @param tunnel connection to the other end
1038  * @param sender who sent the message
1039  * @param msg the data message
1040  * @param atsi performance data for the connection
1041  * @return GNUNET_OK to keep the connection open,
1042  *         GNUNET_SYSERR to close it (signal serious error)
1043  */
1044 static int
1045 handle_data (struct GNUNET_STREAM_Socket *socket,
1046              struct GNUNET_MESH_Tunnel *tunnel,
1047              const struct GNUNET_PeerIdentity *sender,
1048              const struct GNUNET_STREAM_DataMessage *msg,
1049              const struct GNUNET_ATS_Information*atsi)
1050 {
1051   const void *payload;
1052   uint32_t bytes_needed;
1053   uint32_t relative_offset;
1054   uint32_t relative_sequence_number;
1055   uint16_t size;
1056
1057   size = htons (msg->header.header.size);
1058   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1059   {
1060     GNUNET_break_op (0);
1061     return GNUNET_SYSERR;
1062   }
1063
1064   if (GNUNET_PEER_search (sender) != socket->other_peer)
1065   {
1066     LOG (GNUNET_ERROR_TYPE_DEBUG,
1067          "Received DATA from non-confirming peer\n");
1068     return GNUNET_YES;
1069   }
1070
1071   switch (socket->state)
1072   {
1073   case STATE_ESTABLISHED:
1074   case STATE_TRANSMIT_CLOSED:
1075   case STATE_TRANSMIT_CLOSE_WAIT:
1076       
1077     /* check if the message's sequence number is in the range we are
1078        expecting */
1079     relative_sequence_number = 
1080       ntohl (msg->sequence_number) - socket->read_sequence_number;
1081     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1082     {
1083       LOG (GNUNET_ERROR_TYPE_DEBUG,
1084            "Ignoring received message with sequence number %u\n",
1085            ntohl (msg->sequence_number));
1086       /* Start ACK sending task if one is not already present */
1087       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1088       {
1089         socket->ack_task_id = 
1090           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1091                                         (msg->ack_deadline),
1092                                         &ack_task,
1093                                         socket);
1094       }
1095       return GNUNET_YES;
1096     }
1097       
1098     /* Check if we have already seen this message */
1099     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1100                                             relative_sequence_number))
1101     {
1102       LOG (GNUNET_ERROR_TYPE_DEBUG,
1103            "Ignoring already received message with sequence "
1104            "number %u\n",
1105            ntohl (msg->sequence_number));
1106       /* Start ACK sending task if one is not already present */
1107       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1108       {
1109         socket->ack_task_id = 
1110           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1111                                         (msg->ack_deadline),
1112                                         &ack_task,
1113                                         socket);
1114       }
1115       return GNUNET_YES;
1116     }
1117
1118     LOG (GNUNET_ERROR_TYPE_DEBUG,
1119          "Receiving DATA with sequence number: %u and size: %d from %x\n",
1120          ntohl (msg->sequence_number),
1121          ntohs (msg->header.header.size),
1122          socket->other_peer);
1123
1124     /* Check if we have to allocate the buffer */
1125     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1126     relative_offset = ntohl (msg->offset) - socket->read_offset;
1127     bytes_needed = relative_offset + size;
1128     if (bytes_needed > socket->receive_buffer_size)
1129     {
1130       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1131       {
1132         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1133                                                  bytes_needed);
1134         socket->receive_buffer_size = bytes_needed;
1135       }
1136       else
1137       {
1138         LOG (GNUNET_ERROR_TYPE_DEBUG,
1139              "Cannot accommodate packet %d as buffer is full\n",
1140              ntohl (msg->sequence_number));
1141         return GNUNET_YES;
1142       }
1143     }
1144       
1145     /* Copy Data to buffer */
1146     payload = &msg[1];
1147     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1148     memcpy (socket->receive_buffer + relative_offset,
1149             payload,
1150             size);
1151     socket->receive_buffer_boundaries[relative_sequence_number] = 
1152       relative_offset + size;
1153       
1154     /* Modify the ACK bitmap */
1155     ackbitmap_modify_bit (&socket->ack_bitmap,
1156                           relative_sequence_number,
1157                           GNUNET_YES);
1158
1159     /* Start ACK sending task if one is not already present */
1160     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1161     {
1162       socket->ack_task_id = 
1163         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1164                                       (msg->ack_deadline),
1165                                       &ack_task,
1166                                       socket);
1167     }
1168
1169     if ((NULL != socket->read_handle) /* A read handle is waiting */
1170         /* There is no current read task */
1171         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1172         /* We have the first packet */
1173         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1174                                                0)))
1175     {
1176       LOG (GNUNET_ERROR_TYPE_DEBUG,
1177            "Scheduling read processor\n");
1178
1179       socket->read_task_id = 
1180         GNUNET_SCHEDULER_add_now (&call_read_processor,
1181                                   socket);
1182     }
1183       
1184     break;
1185
1186   default:
1187     LOG (GNUNET_ERROR_TYPE_DEBUG,
1188          "Received data message when it cannot be handled\n");
1189     break;
1190   }
1191   return GNUNET_YES;
1192 }
1193
1194
1195 /**
1196  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1197  *
1198  * @param cls the socket (set from GNUNET_MESH_connect)
1199  * @param tunnel connection to the other end
1200  * @param tunnel_ctx place to store local state associated with the tunnel
1201  * @param sender who sent the message
1202  * @param message the actual message
1203  * @param atsi performance data for the connection
1204  * @return GNUNET_OK to keep the connection open,
1205  *         GNUNET_SYSERR to close it (signal serious error)
1206  */
1207 static int
1208 client_handle_data (void *cls,
1209                     struct GNUNET_MESH_Tunnel *tunnel,
1210                     void **tunnel_ctx,
1211                     const struct GNUNET_PeerIdentity *sender,
1212                     const struct GNUNET_MessageHeader *message,
1213                     const struct GNUNET_ATS_Information*atsi)
1214 {
1215   struct GNUNET_STREAM_Socket *socket = cls;
1216
1217   return handle_data (socket, 
1218                       tunnel, 
1219                       sender, 
1220                       (const struct GNUNET_STREAM_DataMessage *) message, 
1221                       atsi);
1222 }
1223
1224
1225 /**
1226  * Callback to set state to ESTABLISHED
1227  *
1228  * @param cls the closure from queue_message FIXME: document
1229  * @param socket the socket to requiring state change
1230  */
1231 static void
1232 set_state_established (void *cls,
1233                        struct GNUNET_STREAM_Socket *socket)
1234 {
1235   struct GNUNET_PeerIdentity initiator_pid;
1236
1237   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1238        "Attaining ESTABLISHED state\n");
1239   socket->write_offset = 0;
1240   socket->read_offset = 0;
1241   socket->state = STATE_ESTABLISHED;
1242   /* FIXME: What if listen_cb is NULL */
1243   if (NULL != socket->lsocket)
1244   {
1245     GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1246     LOG (GNUNET_ERROR_TYPE_DEBUG,
1247          "Calling listen callback\n");
1248     if (GNUNET_SYSERR == 
1249         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1250                                     socket,
1251                                     &initiator_pid))
1252     {
1253       socket->state = STATE_CLOSED;
1254       /* FIXME: We should close in a decent way */
1255       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1256       GNUNET_free (socket);
1257     }
1258   }
1259   else if (socket->open_cb)
1260     socket->open_cb (socket->open_cls, socket);
1261 }
1262
1263
1264 /**
1265  * Callback to set state to HELLO_WAIT
1266  *
1267  * @param cls the closure from queue_message
1268  * @param socket the socket to requiring state change
1269  */
1270 static void
1271 set_state_hello_wait (void *cls,
1272                       struct GNUNET_STREAM_Socket *socket)
1273 {
1274   GNUNET_assert (STATE_INIT == socket->state);
1275   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1276        "Attaining HELLO_WAIT state\n");
1277   socket->state = STATE_HELLO_WAIT;
1278 }
1279
1280
1281 /**
1282  * Callback to set state to CLOSE_WAIT
1283  *
1284  * @param cls the closure from queue_message
1285  * @param socket the socket requiring state change
1286  */
1287 static void
1288 set_state_close_wait (void *cls,
1289                       struct GNUNET_STREAM_Socket *socket)
1290 {
1291   LOG (GNUNET_ERROR_TYPE_DEBUG,
1292        "Attaing CLOSE_WAIT state\n");
1293   socket->state = STATE_CLOSE_WAIT;
1294   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1295   socket->receive_buffer = NULL;
1296   socket->receive_buffer_size = 0;
1297 }
1298
1299
1300 /**
1301  * Callback to set state to RECEIVE_CLOSE_WAIT
1302  *
1303  * @param cls the closure from queue_message
1304  * @param socket the socket requiring state change
1305  */
1306 static void
1307 set_state_receive_close_wait (void *cls,
1308                               struct GNUNET_STREAM_Socket *socket)
1309 {
1310   LOG (GNUNET_ERROR_TYPE_DEBUG,
1311        "Attaing RECEIVE_CLOSE_WAIT state\n");
1312   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1313   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1314   socket->receive_buffer = NULL;
1315   socket->receive_buffer_size = 0;
1316 }
1317
1318
1319 /**
1320  * Callback to set state to TRANSMIT_CLOSE_WAIT
1321  *
1322  * @param cls the closure from queue_message
1323  * @param socket the socket requiring state change
1324  */
1325 static void
1326 set_state_transmit_close_wait (void *cls,
1327                                struct GNUNET_STREAM_Socket *socket)
1328 {
1329   LOG (GNUNET_ERROR_TYPE_DEBUG,
1330        "Attaining TRANSMIT_CLOSE_WAIT state\n");
1331   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1332 }
1333
1334
1335 /**
1336  * Callback to set state to CLOSED
1337  *
1338  * @param cls the closure from queue_message
1339  * @param socket the socket requiring state change
1340  */
1341 static void
1342 set_state_closed (void *cls,
1343                   struct GNUNET_STREAM_Socket *socket)
1344 {
1345   socket->state = STATE_CLOSED;
1346 }
1347
1348 /**
1349  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1350  * socket
1351  *
1352  * @param socket the socket for which this HelloAckMessage has to be generated
1353  * @return the HelloAckMessage
1354  */
1355 static struct GNUNET_STREAM_HelloAckMessage *
1356 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1357 {
1358   struct GNUNET_STREAM_HelloAckMessage *msg;
1359
1360   /* Get the random sequence number */
1361   socket->write_sequence_number = 
1362     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1363   LOG (GNUNET_ERROR_TYPE_DEBUG,
1364        "Generated write sequence number %u\n",
1365        (unsigned int) socket->write_sequence_number);
1366   
1367   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1368   msg->header.header.size = 
1369     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1370   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1371   msg->sequence_number = htonl (socket->write_sequence_number);
1372   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1373
1374   return msg;
1375 }
1376
1377
1378 /**
1379  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1380  *
1381  * @param cls the socket (set from GNUNET_MESH_connect)
1382  * @param tunnel connection to the other end
1383  * @param tunnel_ctx this is NULL
1384  * @param sender who sent the message
1385  * @param message the actual message
1386  * @param atsi performance data for the connection
1387  * @return GNUNET_OK to keep the connection open,
1388  *         GNUNET_SYSERR to close it (signal serious error)
1389  */
1390 static int
1391 client_handle_hello_ack (void *cls,
1392                          struct GNUNET_MESH_Tunnel *tunnel,
1393                          void **tunnel_ctx,
1394                          const struct GNUNET_PeerIdentity *sender,
1395                          const struct GNUNET_MessageHeader *message,
1396                          const struct GNUNET_ATS_Information*atsi)
1397 {
1398   struct GNUNET_STREAM_Socket *socket = cls;
1399   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1400   struct GNUNET_STREAM_HelloAckMessage *reply;
1401
1402   if (GNUNET_PEER_search (sender) != socket->other_peer)
1403   {
1404     LOG (GNUNET_ERROR_TYPE_DEBUG,
1405          "Received HELLO_ACK from non-confirming peer\n");
1406     return GNUNET_YES;
1407   }
1408   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1409   LOG (GNUNET_ERROR_TYPE_DEBUG,
1410        "Received HELLO_ACK from %x\n",
1411        socket->other_peer);
1412
1413   GNUNET_assert (socket->tunnel == tunnel);
1414   switch (socket->state)
1415   {
1416   case STATE_HELLO_WAIT:
1417     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1418     LOG (GNUNET_ERROR_TYPE_DEBUG,
1419          "Read sequence number %u\n",
1420          (unsigned int) socket->read_sequence_number);
1421     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1422     reply = generate_hello_ack_msg (socket);
1423     queue_message (socket,
1424                    &reply->header, 
1425                    &set_state_established, 
1426                    NULL);      
1427     return GNUNET_OK;
1428   case STATE_ESTABLISHED:
1429   case STATE_RECEIVE_CLOSE_WAIT:
1430     // call statistics (# ACKs ignored++)
1431     return GNUNET_OK;
1432   case STATE_INIT:
1433   default:
1434     LOG (GNUNET_ERROR_TYPE_DEBUG,
1435          "Server %x sent HELLO_ACK when in state %d\n", 
1436          socket->other_peer,
1437          socket->state);
1438     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1439     return GNUNET_SYSERR;
1440   }
1441
1442 }
1443
1444
1445 /**
1446  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1447  *
1448  * @param cls the socket (set from GNUNET_MESH_connect)
1449  * @param tunnel connection to the other end
1450  * @param tunnel_ctx this is NULL
1451  * @param sender who sent the message
1452  * @param message the actual message
1453  * @param atsi performance data for the connection
1454  * @return GNUNET_OK to keep the connection open,
1455  *         GNUNET_SYSERR to close it (signal serious error)
1456  */
1457 static int
1458 client_handle_reset (void *cls,
1459                      struct GNUNET_MESH_Tunnel *tunnel,
1460                      void **tunnel_ctx,
1461                      const struct GNUNET_PeerIdentity *sender,
1462                      const struct GNUNET_MessageHeader *message,
1463                      const struct GNUNET_ATS_Information*atsi)
1464 {
1465   // struct GNUNET_STREAM_Socket *socket = cls;
1466
1467   return GNUNET_OK;
1468 }
1469
1470
1471 /**
1472  * Common message handler for handling TRANSMIT_CLOSE messages
1473  *
1474  * @param socket the socket through which the ack was received
1475  * @param tunnel connection to the other end
1476  * @param sender who sent the message
1477  * @param msg the transmit close message
1478  * @param atsi performance data for the connection
1479  * @return GNUNET_OK to keep the connection open,
1480  *         GNUNET_SYSERR to close it (signal serious error)
1481  */
1482 static int
1483 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1484                        struct GNUNET_MESH_Tunnel *tunnel,
1485                        const struct GNUNET_PeerIdentity *sender,
1486                        const struct GNUNET_STREAM_MessageHeader *msg,
1487                        const struct GNUNET_ATS_Information*atsi)
1488 {
1489   struct GNUNET_STREAM_MessageHeader *reply;
1490
1491   switch (socket->state)
1492   {
1493   case STATE_ESTABLISHED:
1494     socket->state = STATE_RECEIVE_CLOSED;
1495
1496     /* Send TRANSMIT_CLOSE_ACK */
1497     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1498     reply->header.type = 
1499       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1500     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1501     queue_message (socket, reply, NULL, NULL);
1502     break;
1503
1504   default:
1505     /* FIXME: Call statistics? */
1506     break;
1507   }
1508   return GNUNET_YES;
1509 }
1510
1511
1512 /**
1513  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1514  *
1515  * @param cls the socket (set from GNUNET_MESH_connect)
1516  * @param tunnel connection to the other end
1517  * @param tunnel_ctx this is NULL
1518  * @param sender who sent the message
1519  * @param message the actual message
1520  * @param atsi performance data for the connection
1521  * @return GNUNET_OK to keep the connection open,
1522  *         GNUNET_SYSERR to close it (signal serious error)
1523  */
1524 static int
1525 client_handle_transmit_close (void *cls,
1526                               struct GNUNET_MESH_Tunnel *tunnel,
1527                               void **tunnel_ctx,
1528                               const struct GNUNET_PeerIdentity *sender,
1529                               const struct GNUNET_MessageHeader *message,
1530                               const struct GNUNET_ATS_Information*atsi)
1531 {
1532   struct GNUNET_STREAM_Socket *socket = cls;
1533   
1534   return handle_transmit_close (socket,
1535                                 tunnel,
1536                                 sender,
1537                                 (struct GNUNET_STREAM_MessageHeader *)message,
1538                                 atsi);
1539 }
1540
1541
1542 /**
1543  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1544  *
1545  * @param socket the socket
1546  * @param tunnel connection to the other end
1547  * @param sender who sent the message
1548  * @param message the actual message
1549  * @param atsi performance data for the connection
1550  * @param operation the close operation which is being ACK'ed
1551  * @return GNUNET_OK to keep the connection open,
1552  *         GNUNET_SYSERR to close it (signal serious error)
1553  */
1554 static int
1555 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1556                           struct GNUNET_MESH_Tunnel *tunnel,
1557                           const struct GNUNET_PeerIdentity *sender,
1558                           const struct GNUNET_STREAM_MessageHeader *message,
1559                           const struct GNUNET_ATS_Information *atsi,
1560                           int operation)
1561 {
1562   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1563
1564   shutdown_handle = socket->shutdown_handle;
1565   if (NULL == shutdown_handle)
1566   {
1567     LOG (GNUNET_ERROR_TYPE_DEBUG,
1568          "Received *CLOSE_ACK when shutdown handle is NULL\n");
1569     return GNUNET_OK;
1570   }
1571
1572   switch (operation)
1573   {
1574   case SHUT_RDWR:
1575     switch (socket->state)
1576     {
1577     case STATE_CLOSE_WAIT:
1578       if (SHUT_RDWR != shutdown_handle->operation)
1579       {
1580         LOG (GNUNET_ERROR_TYPE_DEBUG,
1581              "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n");
1582         return GNUNET_OK;
1583       }
1584
1585       LOG (GNUNET_ERROR_TYPE_DEBUG,
1586            "Received CLOSE_ACK from %x\n",
1587            socket->other_peer);
1588       socket->state = STATE_CLOSED;
1589       break;
1590     default:
1591       LOG (GNUNET_ERROR_TYPE_DEBUG,
1592            "Received CLOSE_ACK when in it not expected\n");
1593       return GNUNET_OK;
1594     }
1595     break;
1596
1597   case SHUT_RD:
1598     switch (socket->state)
1599     {
1600     case STATE_RECEIVE_CLOSE_WAIT:
1601       if (SHUT_RD != shutdown_handle->operation)
1602       {
1603         LOG (GNUNET_ERROR_TYPE_DEBUG,
1604              "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n");
1605         return GNUNET_OK;
1606       }
1607
1608       LOG (GNUNET_ERROR_TYPE_DEBUG,
1609            "Received RECEIVE_CLOSE_ACK from %x\n",
1610            socket->other_peer);
1611       socket->state = STATE_RECEIVE_CLOSED;
1612       break;
1613     default:
1614       LOG (GNUNET_ERROR_TYPE_DEBUG,
1615            "Received RECEIVE_CLOSE_ACK when in it not expected\n");
1616       return GNUNET_OK;
1617     }
1618
1619     break;
1620   case SHUT_WR:
1621     switch (socket->state)
1622     {
1623     case STATE_TRANSMIT_CLOSE_WAIT:
1624       if (SHUT_WR != shutdown_handle->operation)
1625       {
1626         LOG (GNUNET_ERROR_TYPE_DEBUG,
1627              "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
1628         return GNUNET_OK;
1629       }
1630
1631       LOG (GNUNET_ERROR_TYPE_DEBUG,
1632            "Received TRANSMIT_CLOSE_ACK from %x\n",
1633            socket->other_peer);
1634       socket->state = STATE_TRANSMIT_CLOSED;
1635       break;
1636     default:
1637       LOG (GNUNET_ERROR_TYPE_DEBUG,
1638            "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
1639           
1640       return GNUNET_OK;
1641     }
1642     break;
1643   default:
1644     GNUNET_assert (0);
1645   }
1646
1647   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1648     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1649                                    operation);
1650   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1651   socket->shutdown_handle = NULL;
1652   if (GNUNET_SCHEDULER_NO_TASK
1653       != shutdown_handle->close_msg_retransmission_task_id)
1654   {
1655     GNUNET_SCHEDULER_cancel
1656       (shutdown_handle->close_msg_retransmission_task_id);
1657     shutdown_handle->close_msg_retransmission_task_id =
1658       GNUNET_SCHEDULER_NO_TASK;
1659   }
1660   return GNUNET_OK;
1661 }
1662
1663
1664 /**
1665  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1666  *
1667  * @param cls the socket (set from GNUNET_MESH_connect)
1668  * @param tunnel connection to the other end
1669  * @param tunnel_ctx this is NULL
1670  * @param sender who sent the message
1671  * @param message the actual message
1672  * @param atsi performance data for the connection
1673  * @return GNUNET_OK to keep the connection open,
1674  *         GNUNET_SYSERR to close it (signal serious error)
1675  */
1676 static int
1677 client_handle_transmit_close_ack (void *cls,
1678                                   struct GNUNET_MESH_Tunnel *tunnel,
1679                                   void **tunnel_ctx,
1680                                   const struct GNUNET_PeerIdentity *sender,
1681                                   const struct GNUNET_MessageHeader *message,
1682                                   const struct GNUNET_ATS_Information*atsi)
1683 {
1684   struct GNUNET_STREAM_Socket *socket = cls;
1685
1686   return handle_generic_close_ack (socket,
1687                                    tunnel,
1688                                    sender,
1689                                    (const struct GNUNET_STREAM_MessageHeader *)
1690                                    message,
1691                                    atsi,
1692                                    SHUT_WR);
1693 }
1694
1695
1696 /**
1697  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1698  *
1699  * @param socket the socket
1700  * @param tunnel connection to the other end
1701  * @param sender who sent the message
1702  * @param message the actual message
1703  * @param atsi performance data for the connection
1704  * @return GNUNET_OK to keep the connection open,
1705  *         GNUNET_SYSERR to close it (signal serious error)
1706  */
1707 static int
1708 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1709                       struct GNUNET_MESH_Tunnel *tunnel,
1710                       const struct GNUNET_PeerIdentity *sender,
1711                       const struct GNUNET_STREAM_MessageHeader *message,
1712                       const struct GNUNET_ATS_Information *atsi)
1713 {
1714   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1715
1716   switch (socket->state)
1717   {
1718   case STATE_INIT:
1719   case STATE_LISTEN:
1720   case STATE_HELLO_WAIT:
1721     LOG (GNUNET_ERROR_TYPE_DEBUG,
1722          "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1723     return GNUNET_OK;
1724   default:
1725     break;
1726   }
1727   
1728   LOG (GNUNET_ERROR_TYPE_DEBUG,
1729        "Received RECEIVE_CLOSE from %x\n",
1730        socket->other_peer);
1731   receive_close_ack =
1732     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1733   receive_close_ack->header.size =
1734     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1735   receive_close_ack->header.type =
1736     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1737   queue_message (socket,
1738                  receive_close_ack,
1739                  &set_state_closed,
1740                  NULL);
1741   
1742   /* FIXME: Handle the case where write handle is present; the write operation
1743      should be deemed as finised and the write continuation callback
1744      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1745   return GNUNET_OK;
1746 }
1747
1748
1749 /**
1750  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1751  *
1752  * @param cls the socket (set from GNUNET_MESH_connect)
1753  * @param tunnel connection to the other end
1754  * @param tunnel_ctx this is NULL
1755  * @param sender who sent the message
1756  * @param message the actual message
1757  * @param atsi performance data for the connection
1758  * @return GNUNET_OK to keep the connection open,
1759  *         GNUNET_SYSERR to close it (signal serious error)
1760  */
1761 static int
1762 client_handle_receive_close (void *cls,
1763                              struct GNUNET_MESH_Tunnel *tunnel,
1764                              void **tunnel_ctx,
1765                              const struct GNUNET_PeerIdentity *sender,
1766                              const struct GNUNET_MessageHeader *message,
1767                              const struct GNUNET_ATS_Information*atsi)
1768 {
1769   struct GNUNET_STREAM_Socket *socket = cls;
1770
1771   return
1772     handle_receive_close (socket,
1773                           tunnel,
1774                           sender,
1775                           (const struct GNUNET_STREAM_MessageHeader *) message,
1776                           atsi);
1777 }
1778
1779
1780 /**
1781  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1782  *
1783  * @param cls the socket (set from GNUNET_MESH_connect)
1784  * @param tunnel connection to the other end
1785  * @param tunnel_ctx this is NULL
1786  * @param sender who sent the message
1787  * @param message the actual message
1788  * @param atsi performance data for the connection
1789  * @return GNUNET_OK to keep the connection open,
1790  *         GNUNET_SYSERR to close it (signal serious error)
1791  */
1792 static int
1793 client_handle_receive_close_ack (void *cls,
1794                                  struct GNUNET_MESH_Tunnel *tunnel,
1795                                  void **tunnel_ctx,
1796                                  const struct GNUNET_PeerIdentity *sender,
1797                                  const struct GNUNET_MessageHeader *message,
1798                                  const struct GNUNET_ATS_Information*atsi)
1799 {
1800   struct GNUNET_STREAM_Socket *socket = cls;
1801
1802   return handle_generic_close_ack (socket,
1803                                    tunnel,
1804                                    sender,
1805                                    (const struct GNUNET_STREAM_MessageHeader *)
1806                                    message,
1807                                    atsi,
1808                                    SHUT_RD);
1809 }
1810
1811
1812 /**
1813  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1814  *
1815  * @param socket the socket
1816  * @param tunnel connection to the other end
1817  * @param sender who sent the message
1818  * @param message the actual message
1819  * @param atsi performance data for the connection
1820  * @return GNUNET_OK to keep the connection open,
1821  *         GNUNET_SYSERR to close it (signal serious error)
1822  */
1823 static int
1824 handle_close (struct GNUNET_STREAM_Socket *socket,
1825               struct GNUNET_MESH_Tunnel *tunnel,
1826               const struct GNUNET_PeerIdentity *sender,
1827               const struct GNUNET_STREAM_MessageHeader *message,
1828               const struct GNUNET_ATS_Information*atsi)
1829 {
1830   struct GNUNET_STREAM_MessageHeader *close_ack;
1831
1832   switch (socket->state)
1833   {
1834   case STATE_INIT:
1835   case STATE_LISTEN:
1836   case STATE_HELLO_WAIT:
1837     LOG (GNUNET_ERROR_TYPE_DEBUG,
1838          "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1839     return GNUNET_OK;
1840   default:
1841     break;
1842   }
1843
1844   LOG (GNUNET_ERROR_TYPE_DEBUG,
1845        "Received CLOSE from %x\n",
1846        socket->other_peer);
1847   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1848   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1849   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1850   queue_message (socket,
1851                  close_ack,
1852                  &set_state_closed,
1853                  NULL);
1854   if (socket->state == STATE_CLOSED)
1855     return GNUNET_OK;
1856
1857   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1858   socket->receive_buffer = NULL;
1859   socket->receive_buffer_size = 0;
1860   return GNUNET_OK;
1861 }
1862
1863
1864 /**
1865  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1866  *
1867  * @param cls the socket (set from GNUNET_MESH_connect)
1868  * @param tunnel connection to the other end
1869  * @param tunnel_ctx this is NULL
1870  * @param sender who sent the message
1871  * @param message the actual message
1872  * @param atsi performance data for the connection
1873  * @return GNUNET_OK to keep the connection open,
1874  *         GNUNET_SYSERR to close it (signal serious error)
1875  */
1876 static int
1877 client_handle_close (void *cls,
1878                      struct GNUNET_MESH_Tunnel *tunnel,
1879                      void **tunnel_ctx,
1880                      const struct GNUNET_PeerIdentity *sender,
1881                      const struct GNUNET_MessageHeader *message,
1882                      const struct GNUNET_ATS_Information*atsi)
1883 {
1884   struct GNUNET_STREAM_Socket *socket = cls;
1885
1886   return handle_close (socket,
1887                        tunnel,
1888                        sender,
1889                        (const struct GNUNET_STREAM_MessageHeader *) message,
1890                        atsi);
1891 }
1892
1893
1894 /**
1895  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1896  *
1897  * @param cls the socket (set from GNUNET_MESH_connect)
1898  * @param tunnel connection to the other end
1899  * @param tunnel_ctx this is NULL
1900  * @param sender who sent the message
1901  * @param message the actual message
1902  * @param atsi performance data for the connection
1903  * @return GNUNET_OK to keep the connection open,
1904  *         GNUNET_SYSERR to close it (signal serious error)
1905  */
1906 static int
1907 client_handle_close_ack (void *cls,
1908                          struct GNUNET_MESH_Tunnel *tunnel,
1909                          void **tunnel_ctx,
1910                          const struct GNUNET_PeerIdentity *sender,
1911                          const struct GNUNET_MessageHeader *message,
1912                          const struct GNUNET_ATS_Information *atsi)
1913 {
1914   struct GNUNET_STREAM_Socket *socket = cls;
1915
1916   return handle_generic_close_ack (socket,
1917                                    tunnel,
1918                                    sender,
1919                                    (const struct GNUNET_STREAM_MessageHeader *) 
1920                                    message,
1921                                    atsi,
1922                                    SHUT_RDWR);
1923 }
1924
1925 /*****************************/
1926 /* Server's Message Handlers */
1927 /*****************************/
1928
1929 /**
1930  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1931  *
1932  * @param cls the closure
1933  * @param tunnel connection to the other end
1934  * @param tunnel_ctx the socket
1935  * @param sender who sent the message
1936  * @param message the actual message
1937  * @param atsi performance data for the connection
1938  * @return GNUNET_OK to keep the connection open,
1939  *         GNUNET_SYSERR to close it (signal serious error)
1940  */
1941 static int
1942 server_handle_data (void *cls,
1943                     struct GNUNET_MESH_Tunnel *tunnel,
1944                     void **tunnel_ctx,
1945                     const struct GNUNET_PeerIdentity *sender,
1946                     const struct GNUNET_MessageHeader *message,
1947                     const struct GNUNET_ATS_Information*atsi)
1948 {
1949   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1950
1951   return handle_data (socket,
1952                       tunnel,
1953                       sender,
1954                       (const struct GNUNET_STREAM_DataMessage *)message,
1955                       atsi);
1956 }
1957
1958
1959 /**
1960  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1961  *
1962  * @param cls the closure
1963  * @param tunnel connection to the other end
1964  * @param tunnel_ctx the socket
1965  * @param sender who sent the message
1966  * @param message the actual message
1967  * @param atsi performance data for the connection
1968  * @return GNUNET_OK to keep the connection open,
1969  *         GNUNET_SYSERR to close it (signal serious error)
1970  */
1971 static int
1972 server_handle_hello (void *cls,
1973                      struct GNUNET_MESH_Tunnel *tunnel,
1974                      void **tunnel_ctx,
1975                      const struct GNUNET_PeerIdentity *sender,
1976                      const struct GNUNET_MessageHeader *message,
1977                      const struct GNUNET_ATS_Information*atsi)
1978 {
1979   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1980   struct GNUNET_STREAM_HelloAckMessage *reply;
1981
1982   if (GNUNET_PEER_search (sender) != socket->other_peer)
1983   {
1984     LOG (GNUNET_ERROR_TYPE_DEBUG,
1985          "Received HELLO from non-confirming peer\n");
1986     return GNUNET_YES;
1987   }
1988
1989   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1990                  ntohs (message->type));
1991   GNUNET_assert (socket->tunnel == tunnel);
1992   LOG (GNUNET_ERROR_TYPE_DEBUG,
1993        "Received HELLO from %x\n", 
1994        socket->other_peer);
1995
1996   if (STATE_INIT == socket->state)
1997   {
1998     reply = generate_hello_ack_msg (socket);
1999     queue_message (socket, 
2000                    &reply->header,
2001                    &set_state_hello_wait, 
2002                    NULL);
2003   }
2004   else
2005   {
2006     LOG (GNUNET_ERROR_TYPE_DEBUG,
2007          "Client sent HELLO when in state %d\n", socket->state);
2008     /* FIXME: Send RESET? */
2009       
2010   }
2011   return GNUNET_OK;
2012 }
2013
2014
2015 /**
2016  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2017  *
2018  * @param cls the closure
2019  * @param tunnel connection to the other end
2020  * @param tunnel_ctx the socket
2021  * @param sender who sent the message
2022  * @param message the actual message
2023  * @param atsi performance data for the connection
2024  * @return GNUNET_OK to keep the connection open,
2025  *         GNUNET_SYSERR to close it (signal serious error)
2026  */
2027 static int
2028 server_handle_hello_ack (void *cls,
2029                          struct GNUNET_MESH_Tunnel *tunnel,
2030                          void **tunnel_ctx,
2031                          const struct GNUNET_PeerIdentity *sender,
2032                          const struct GNUNET_MessageHeader *message,
2033                          const struct GNUNET_ATS_Information*atsi)
2034 {
2035   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2036   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2037
2038   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2039                  ntohs (message->type));
2040   GNUNET_assert (socket->tunnel == tunnel);
2041   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2042   if (STATE_HELLO_WAIT == socket->state)
2043   {
2044     LOG (GNUNET_ERROR_TYPE_DEBUG,
2045          "Received HELLO_ACK from %x\n",
2046          socket->other_peer);
2047     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2048     LOG (GNUNET_ERROR_TYPE_DEBUG,
2049          "Read sequence number %u\n",
2050          (unsigned int) socket->read_sequence_number);
2051     socket->receiver_window_available = 
2052       ntohl (ack_message->receiver_window_size);
2053     /* Attain ESTABLISHED state */
2054     set_state_established (NULL, socket);
2055   }
2056   else
2057   {
2058     LOG (GNUNET_ERROR_TYPE_DEBUG,
2059          "Client sent HELLO_ACK when in state %d\n", socket->state);
2060     /* FIXME: Send RESET? */
2061       
2062   }
2063   return GNUNET_OK;
2064 }
2065
2066
2067 /**
2068  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2069  *
2070  * @param cls the closure
2071  * @param tunnel connection to the other end
2072  * @param tunnel_ctx the socket
2073  * @param sender who sent the message
2074  * @param message the actual message
2075  * @param atsi performance data for the connection
2076  * @return GNUNET_OK to keep the connection open,
2077  *         GNUNET_SYSERR to close it (signal serious error)
2078  */
2079 static int
2080 server_handle_reset (void *cls,
2081                      struct GNUNET_MESH_Tunnel *tunnel,
2082                      void **tunnel_ctx,
2083                      const struct GNUNET_PeerIdentity *sender,
2084                      const struct GNUNET_MessageHeader *message,
2085                      const struct GNUNET_ATS_Information*atsi)
2086 {
2087   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2088
2089   return GNUNET_OK;
2090 }
2091
2092
2093 /**
2094  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2095  *
2096  * @param cls the closure
2097  * @param tunnel connection to the other end
2098  * @param tunnel_ctx the socket
2099  * @param sender who sent the message
2100  * @param message the actual message
2101  * @param atsi performance data for the connection
2102  * @return GNUNET_OK to keep the connection open,
2103  *         GNUNET_SYSERR to close it (signal serious error)
2104  */
2105 static int
2106 server_handle_transmit_close (void *cls,
2107                               struct GNUNET_MESH_Tunnel *tunnel,
2108                               void **tunnel_ctx,
2109                               const struct GNUNET_PeerIdentity *sender,
2110                               const struct GNUNET_MessageHeader *message,
2111                               const struct GNUNET_ATS_Information*atsi)
2112 {
2113   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2114
2115   return handle_transmit_close (socket,
2116                                 tunnel,
2117                                 sender,
2118                                 (struct GNUNET_STREAM_MessageHeader *)message,
2119                                 atsi);
2120 }
2121
2122
2123 /**
2124  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2125  *
2126  * @param cls the closure
2127  * @param tunnel connection to the other end
2128  * @param tunnel_ctx the socket
2129  * @param sender who sent the message
2130  * @param message the actual message
2131  * @param atsi performance data for the connection
2132  * @return GNUNET_OK to keep the connection open,
2133  *         GNUNET_SYSERR to close it (signal serious error)
2134  */
2135 static int
2136 server_handle_transmit_close_ack (void *cls,
2137                                   struct GNUNET_MESH_Tunnel *tunnel,
2138                                   void **tunnel_ctx,
2139                                   const struct GNUNET_PeerIdentity *sender,
2140                                   const struct GNUNET_MessageHeader *message,
2141                                   const struct GNUNET_ATS_Information*atsi)
2142 {
2143   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2144
2145   return handle_generic_close_ack (socket,
2146                                    tunnel,
2147                                    sender,
2148                                    (const struct GNUNET_STREAM_MessageHeader *)
2149                                    message,
2150                                    atsi,
2151                                    SHUT_WR);
2152 }
2153
2154
2155 /**
2156  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2157  *
2158  * @param cls the closure
2159  * @param tunnel connection to the other end
2160  * @param tunnel_ctx the socket
2161  * @param sender who sent the message
2162  * @param message the actual message
2163  * @param atsi performance data for the connection
2164  * @return GNUNET_OK to keep the connection open,
2165  *         GNUNET_SYSERR to close it (signal serious error)
2166  */
2167 static int
2168 server_handle_receive_close (void *cls,
2169                              struct GNUNET_MESH_Tunnel *tunnel,
2170                              void **tunnel_ctx,
2171                              const struct GNUNET_PeerIdentity *sender,
2172                              const struct GNUNET_MessageHeader *message,
2173                              const struct GNUNET_ATS_Information*atsi)
2174 {
2175   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2176
2177   return
2178     handle_receive_close (socket,
2179                           tunnel,
2180                           sender,
2181                           (const struct GNUNET_STREAM_MessageHeader *) message,
2182                           atsi);
2183 }
2184
2185
2186 /**
2187  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2188  *
2189  * @param cls the closure
2190  * @param tunnel connection to the other end
2191  * @param tunnel_ctx the socket
2192  * @param sender who sent the message
2193  * @param message the actual message
2194  * @param atsi performance data for the connection
2195  * @return GNUNET_OK to keep the connection open,
2196  *         GNUNET_SYSERR to close it (signal serious error)
2197  */
2198 static int
2199 server_handle_receive_close_ack (void *cls,
2200                                  struct GNUNET_MESH_Tunnel *tunnel,
2201                                  void **tunnel_ctx,
2202                                  const struct GNUNET_PeerIdentity *sender,
2203                                  const struct GNUNET_MessageHeader *message,
2204                                  const struct GNUNET_ATS_Information*atsi)
2205 {
2206   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2207
2208   return handle_generic_close_ack (socket,
2209                                    tunnel,
2210                                    sender,
2211                                    (const struct GNUNET_STREAM_MessageHeader *)
2212                                    message,
2213                                    atsi,
2214                                    SHUT_RD);
2215 }
2216
2217
2218 /**
2219  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2220  *
2221  * @param cls the listen socket (from GNUNET_MESH_connect in
2222  *          GNUNET_STREAM_listen) 
2223  * @param tunnel connection to the other end
2224  * @param tunnel_ctx the socket
2225  * @param sender who sent the message
2226  * @param message the actual message
2227  * @param atsi performance data for the connection
2228  * @return GNUNET_OK to keep the connection open,
2229  *         GNUNET_SYSERR to close it (signal serious error)
2230  */
2231 static int
2232 server_handle_close (void *cls,
2233                      struct GNUNET_MESH_Tunnel *tunnel,
2234                      void **tunnel_ctx,
2235                      const struct GNUNET_PeerIdentity *sender,
2236                      const struct GNUNET_MessageHeader *message,
2237                      const struct GNUNET_ATS_Information*atsi)
2238 {
2239   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2240   
2241   return handle_close (socket,
2242                        tunnel,
2243                        sender,
2244                        (const struct GNUNET_STREAM_MessageHeader *) message,
2245                        atsi);
2246 }
2247
2248
2249 /**
2250  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2251  *
2252  * @param cls the closure
2253  * @param tunnel connection to the other end
2254  * @param tunnel_ctx the socket
2255  * @param sender who sent the message
2256  * @param message the actual message
2257  * @param atsi performance data for the connection
2258  * @return GNUNET_OK to keep the connection open,
2259  *         GNUNET_SYSERR to close it (signal serious error)
2260  */
2261 static int
2262 server_handle_close_ack (void *cls,
2263                          struct GNUNET_MESH_Tunnel *tunnel,
2264                          void **tunnel_ctx,
2265                          const struct GNUNET_PeerIdentity *sender,
2266                          const struct GNUNET_MessageHeader *message,
2267                          const struct GNUNET_ATS_Information*atsi)
2268 {
2269   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2270
2271   return handle_generic_close_ack (socket,
2272                                    tunnel,
2273                                    sender,
2274                                    (const struct GNUNET_STREAM_MessageHeader *) 
2275                                    message,
2276                                    atsi,
2277                                    SHUT_RDWR);
2278 }
2279
2280
2281 /**
2282  * Handler for DATA_ACK messages
2283  *
2284  * @param socket the socket through which the ack was received
2285  * @param tunnel connection to the other end
2286  * @param sender who sent the message
2287  * @param ack the acknowledgment message
2288  * @param atsi performance data for the connection
2289  * @return GNUNET_OK to keep the connection open,
2290  *         GNUNET_SYSERR to close it (signal serious error)
2291  */
2292 static int
2293 handle_ack (struct GNUNET_STREAM_Socket *socket,
2294             struct GNUNET_MESH_Tunnel *tunnel,
2295             const struct GNUNET_PeerIdentity *sender,
2296             const struct GNUNET_STREAM_AckMessage *ack,
2297             const struct GNUNET_ATS_Information*atsi)
2298 {
2299   unsigned int packet;
2300   int need_retransmission;
2301   
2302
2303   if (GNUNET_PEER_search (sender) != socket->other_peer)
2304   {
2305     LOG (GNUNET_ERROR_TYPE_DEBUG,
2306          "Received ACK from non-confirming peer\n");
2307     return GNUNET_YES;
2308   }
2309
2310   switch (socket->state)
2311   {
2312   case (STATE_ESTABLISHED):
2313   case (STATE_RECEIVE_CLOSED):
2314   case (STATE_RECEIVE_CLOSE_WAIT):
2315     if (NULL == socket->write_handle)
2316     {
2317       LOG (GNUNET_ERROR_TYPE_DEBUG,
2318            "Received DATA_ACK when write_handle is NULL\n");
2319       return GNUNET_OK;
2320     }
2321     /* FIXME: increment in the base sequence number is breaking current flow
2322      */
2323     if (!((socket->write_sequence_number 
2324            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2325     {
2326       LOG (GNUNET_ERROR_TYPE_DEBUG,
2327            "Received DATA_ACK with unexpected base sequence number\n");
2328       LOG (GNUNET_ERROR_TYPE_DEBUG,
2329            "Current write sequence: %u; Ack's base sequence: %u\n",
2330            socket->write_sequence_number,
2331            ntohl (ack->base_sequence_number));
2332       return GNUNET_OK;
2333     }
2334     /* FIXME: include the case when write_handle is cancelled - ignore the 
2335        acks */
2336
2337     LOG (GNUNET_ERROR_TYPE_DEBUG,
2338          "Received DATA_ACK from %x\n",
2339          socket->other_peer);
2340       
2341     /* Cancel the retransmission task */
2342     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2343     {
2344       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2345       socket->retransmission_timeout_task_id = 
2346         GNUNET_SCHEDULER_NO_TASK;
2347     }
2348
2349     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2350     {
2351       if (NULL == socket->write_handle->messages[packet]) break;
2352       if (ntohl (ack->base_sequence_number)
2353           >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2354         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2355                               packet,
2356                               GNUNET_YES);
2357       else
2358         if (GNUNET_YES == 
2359             ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2360                                   ntohl (socket->write_handle->messages[packet]->sequence_number)
2361                                   - ntohl (ack->base_sequence_number)))
2362           ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2363                                 packet,
2364                                 GNUNET_YES);
2365     }
2366
2367     /* Update the receive window remaining
2368        FIXME : Should update with the value from a data ack with greater
2369        sequence number */
2370     socket->receiver_window_available = 
2371       ntohl (ack->receive_window_remaining);
2372
2373     /* Check if we have received all acknowledgements */
2374     need_retransmission = GNUNET_NO;
2375     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2376     {
2377       if (NULL == socket->write_handle->messages[packet]) break;
2378       if (GNUNET_YES != ackbitmap_is_bit_set 
2379           (&socket->write_handle->ack_bitmap,packet))
2380       {
2381         need_retransmission = GNUNET_YES;
2382         break;
2383       }
2384     }
2385     if (GNUNET_YES == need_retransmission)
2386     {
2387       write_data (socket);
2388     }
2389     else      /* We have to call the write continuation callback now */
2390     {
2391       /* Free the packets */
2392       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2393       {
2394         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2395       }
2396       if (NULL != socket->write_handle->write_cont)
2397         socket->write_handle->write_cont
2398           (socket->write_handle->write_cont_cls,
2399            socket->status,
2400            socket->write_handle->size);
2401       LOG (GNUNET_ERROR_TYPE_DEBUG,
2402            "Write completion callback completed\n");
2403       /* We are done with the write handle - Freeing it */
2404       GNUNET_free (socket->write_handle);
2405       socket->write_handle = NULL;
2406     }
2407     break;
2408   default:
2409     break;
2410   }
2411   return GNUNET_OK;
2412 }
2413
2414
2415 /**
2416  * Handler for DATA_ACK messages
2417  *
2418  * @param cls the 'struct GNUNET_STREAM_Socket'
2419  * @param tunnel connection to the other end
2420  * @param tunnel_ctx unused
2421  * @param sender who sent the message
2422  * @param message the actual message
2423  * @param atsi performance data for the connection
2424  * @return GNUNET_OK to keep the connection open,
2425  *         GNUNET_SYSERR to close it (signal serious error)
2426  */
2427 static int
2428 client_handle_ack (void *cls,
2429                    struct GNUNET_MESH_Tunnel *tunnel,
2430                    void **tunnel_ctx,
2431                    const struct GNUNET_PeerIdentity *sender,
2432                    const struct GNUNET_MessageHeader *message,
2433                    const struct GNUNET_ATS_Information*atsi)
2434 {
2435   struct GNUNET_STREAM_Socket *socket = cls;
2436   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2437  
2438   return handle_ack (socket, tunnel, sender, ack, atsi);
2439 }
2440
2441
2442 /**
2443  * Handler for DATA_ACK messages
2444  *
2445  * @param cls the server's listen socket
2446  * @param tunnel connection to the other end
2447  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2448  * @param sender who sent the message
2449  * @param message the actual message
2450  * @param atsi performance data for the connection
2451  * @return GNUNET_OK to keep the connection open,
2452  *         GNUNET_SYSERR to close it (signal serious error)
2453  */
2454 static int
2455 server_handle_ack (void *cls,
2456                    struct GNUNET_MESH_Tunnel *tunnel,
2457                    void **tunnel_ctx,
2458                    const struct GNUNET_PeerIdentity *sender,
2459                    const struct GNUNET_MessageHeader *message,
2460                    const struct GNUNET_ATS_Information*atsi)
2461 {
2462   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2463   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2464  
2465   return handle_ack (socket, tunnel, sender, ack, atsi);
2466 }
2467
2468
2469 /**
2470  * For client message handlers, the stream socket is in the
2471  * closure argument.
2472  */
2473 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2474   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2475   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2476    sizeof (struct GNUNET_STREAM_AckMessage) },
2477   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2478    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2479   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2480    sizeof (struct GNUNET_STREAM_MessageHeader)},
2481   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2482    sizeof (struct GNUNET_STREAM_MessageHeader)},
2483   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2484    sizeof (struct GNUNET_STREAM_MessageHeader)},
2485   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2486    sizeof (struct GNUNET_STREAM_MessageHeader)},
2487   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2488    sizeof (struct GNUNET_STREAM_MessageHeader)},
2489   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2490    sizeof (struct GNUNET_STREAM_MessageHeader)},
2491   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2492    sizeof (struct GNUNET_STREAM_MessageHeader)},
2493   {NULL, 0, 0}
2494 };
2495
2496
2497 /**
2498  * For server message handlers, the stream socket is in the
2499  * tunnel context, and the listen socket in the closure argument.
2500  */
2501 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2502   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2503   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2504    sizeof (struct GNUNET_STREAM_AckMessage) },
2505   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2506    sizeof (struct GNUNET_STREAM_MessageHeader)},
2507   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2508    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2509   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2510    sizeof (struct GNUNET_STREAM_MessageHeader)},
2511   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2512    sizeof (struct GNUNET_STREAM_MessageHeader)},
2513   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2514    sizeof (struct GNUNET_STREAM_MessageHeader)},
2515   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2516    sizeof (struct GNUNET_STREAM_MessageHeader)},
2517   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2518    sizeof (struct GNUNET_STREAM_MessageHeader)},
2519   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2520    sizeof (struct GNUNET_STREAM_MessageHeader)},
2521   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2522    sizeof (struct GNUNET_STREAM_MessageHeader)},
2523   {NULL, 0, 0}
2524 };
2525
2526
2527 /**
2528  * Function called when our target peer is connected to our tunnel
2529  *
2530  * @param cls the socket for which this tunnel is created
2531  * @param peer the peer identity of the target
2532  * @param atsi performance data for the connection
2533  */
2534 static void
2535 mesh_peer_connect_callback (void *cls,
2536                             const struct GNUNET_PeerIdentity *peer,
2537                             const struct GNUNET_ATS_Information * atsi)
2538 {
2539   struct GNUNET_STREAM_Socket *socket = cls;
2540   struct GNUNET_STREAM_MessageHeader *message;
2541   GNUNET_PEER_Id connected_peer;
2542
2543   connected_peer = GNUNET_PEER_search (peer);
2544   
2545   if (connected_peer != socket->other_peer)
2546   {
2547     LOG (GNUNET_ERROR_TYPE_DEBUG,
2548          "A peer which is not our target has connected to our tunnel\n");
2549     return;
2550   }
2551   
2552   LOG (GNUNET_ERROR_TYPE_DEBUG,
2553        "Target peer %x connected\n", 
2554        connected_peer);
2555   
2556   /* Set state to INIT */
2557   socket->state = STATE_INIT;
2558
2559   /* Send HELLO message */
2560   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2561   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2562   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2563   queue_message (socket,
2564                  message,
2565                  &set_state_hello_wait,
2566                  NULL);
2567
2568   /* Call open callback */
2569   if (NULL == socket->open_cb)
2570   {
2571     LOG (GNUNET_ERROR_TYPE_DEBUG,
2572          "STREAM_open callback is NULL\n");
2573   }
2574 }
2575
2576
2577 /**
2578  * Function called when our target peer is disconnected from our tunnel
2579  *
2580  * @param cls the socket associated which this tunnel
2581  * @param peer the peer identity of the target
2582  */
2583 static void
2584 mesh_peer_disconnect_callback (void *cls,
2585                                const struct GNUNET_PeerIdentity *peer)
2586 {
2587   struct GNUNET_STREAM_Socket *socket=cls;
2588   
2589   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2590   LOG (GNUNET_ERROR_TYPE_DEBUG,
2591        "Other peer %x disconnected\n",
2592        socket->other_peer);
2593 }
2594
2595
2596 /**
2597  * Method called whenever a peer creates a tunnel to us
2598  *
2599  * @param cls closure
2600  * @param tunnel new handle to the tunnel
2601  * @param initiator peer that started the tunnel
2602  * @param atsi performance information for the tunnel
2603  * @return initial tunnel context for the tunnel
2604  *         (can be NULL -- that's not an error)
2605  */
2606 static void *
2607 new_tunnel_notify (void *cls,
2608                    struct GNUNET_MESH_Tunnel *tunnel,
2609                    const struct GNUNET_PeerIdentity *initiator,
2610                    const struct GNUNET_ATS_Information *atsi)
2611 {
2612   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2613   struct GNUNET_STREAM_Socket *socket;
2614
2615   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2616      from the same peer again until the socket is closed */
2617
2618   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2619   socket->other_peer = GNUNET_PEER_intern (initiator);
2620   socket->tunnel = tunnel;
2621   socket->session_id = 0;       /* FIXME */
2622   socket->state = STATE_INIT;
2623   socket->lsocket = lsocket; 
2624   LOG (GNUNET_ERROR_TYPE_DEBUG,
2625        "Peer %x initiated tunnel to us\n", 
2626        socket->other_peer);
2627   
2628   /* FIXME: Copy MESH handle from lsocket to socket */
2629   
2630   return socket;
2631 }
2632
2633
2634 /**
2635  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2636  * any associated state.  This function is NOT called if the client has
2637  * explicitly asked for the tunnel to be destroyed using
2638  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2639  * the tunnel.
2640  *
2641  * @param cls closure (set from GNUNET_MESH_connect)
2642  * @param tunnel connection to the other end (henceforth invalid)
2643  * @param tunnel_ctx place where local state associated
2644  *                   with the tunnel is stored
2645  */
2646 static void 
2647 tunnel_cleaner (void *cls,
2648                 const struct GNUNET_MESH_Tunnel *tunnel,
2649                 void *tunnel_ctx)
2650 {
2651   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2652
2653   if (tunnel != socket->tunnel)
2654     return;
2655
2656   GNUNET_break_op(0);
2657   LOG (GNUNET_ERROR_TYPE_DEBUG,
2658        "Peer %x has terminated connection abruptly\n",
2659        socket->other_peer);
2660
2661   socket->status = GNUNET_STREAM_SHUTDOWN;
2662
2663   /* Clear Transmit handles */
2664   if (NULL != socket->transmit_handle)
2665   {
2666     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2667     socket->transmit_handle = NULL;
2668   }
2669   if (NULL != socket->ack_transmit_handle)
2670   {
2671     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2672     GNUNET_free (socket->ack_msg);
2673     socket->ack_msg = NULL;
2674     socket->ack_transmit_handle = NULL;
2675   }
2676   /* Stop Tasks using socket->tunnel */
2677   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2678   {
2679     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2680     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2681   }
2682   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2683   {
2684     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2685     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2686   }
2687   /* FIXME: Cancel all other tasks using socket->tunnel */
2688   socket->tunnel = NULL;
2689 }
2690
2691
2692 /*****************/
2693 /* API functions */
2694 /*****************/
2695
2696
2697 /**
2698  * Tries to open a stream to the target peer
2699  *
2700  * @param cfg configuration to use
2701  * @param target the target peer to which the stream has to be opened
2702  * @param app_port the application port number which uniquely identifies this
2703  *            stream
2704  * @param open_cb this function will be called after stream has be established 
2705  * @param open_cb_cls the closure for open_cb
2706  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2707  * @return if successful it returns the stream socket; NULL if stream cannot be
2708  *         opened 
2709  */
2710 struct GNUNET_STREAM_Socket *
2711 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2712                     const struct GNUNET_PeerIdentity *target,
2713                     GNUNET_MESH_ApplicationType app_port,
2714                     GNUNET_STREAM_OpenCallback open_cb,
2715                     void *open_cb_cls,
2716                     ...)
2717 {
2718   struct GNUNET_STREAM_Socket *socket;
2719   enum GNUNET_STREAM_Option option;
2720   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2721   va_list vargs;                /* Variable arguments */
2722
2723   LOG (GNUNET_ERROR_TYPE_DEBUG,
2724        "%s\n", __func__);
2725
2726   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2727   socket->other_peer = GNUNET_PEER_intern (target);
2728   socket->open_cb = open_cb;
2729   socket->open_cls = open_cb_cls;
2730   /* Set defaults */
2731   socket->retransmit_timeout = 
2732     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2733
2734   va_start (vargs, open_cb_cls); /* Parse variable args */
2735   do {
2736     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2737     switch (option)
2738     {
2739     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2740       /* Expect struct GNUNET_TIME_Relative */
2741       socket->retransmit_timeout = va_arg (vargs,
2742                                            struct GNUNET_TIME_Relative);
2743       break;
2744     case GNUNET_STREAM_OPTION_END:
2745       break;
2746     }
2747   } while (GNUNET_STREAM_OPTION_END != option);
2748   va_end (vargs);               /* End of variable args parsing */
2749   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2750                                       10,  /* QUEUE size as parameter? */
2751                                       socket, /* cls */
2752                                       NULL, /* No inbound tunnel handler */
2753                                       NULL, /* No in-tunnel cleaner */
2754                                       client_message_handlers,
2755                                       ports); /* We don't get inbound tunnels */
2756   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2757   {
2758     GNUNET_free (socket);
2759     return NULL;
2760   }
2761
2762   /* Now create the mesh tunnel to target */
2763   LOG (GNUNET_ERROR_TYPE_DEBUG,
2764        "Creating MESH Tunnel\n");
2765   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2766                                               NULL, /* Tunnel context */
2767                                               &mesh_peer_connect_callback,
2768                                               &mesh_peer_disconnect_callback,
2769                                               socket);
2770   GNUNET_assert (NULL != socket->tunnel);
2771   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2772                                         target);
2773   
2774   LOG (GNUNET_ERROR_TYPE_DEBUG,
2775        "%s() END\n", __func__);
2776   return socket;
2777 }
2778
2779
2780 /**
2781  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2782  *
2783  * @param socket the stream socket
2784  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2785  * @param completion_cb the callback that will be called upon successful
2786  *          shutdown of given operation
2787  * @param completion_cls the closure for the completion callback
2788  * @return the shutdown handle
2789  */
2790 struct GNUNET_STREAM_ShutdownHandle *
2791 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2792                         int operation,
2793                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2794                         void *completion_cls)
2795 {
2796   struct GNUNET_STREAM_ShutdownHandle *handle;
2797   struct GNUNET_STREAM_MessageHeader *msg;
2798   
2799   GNUNET_assert (NULL == socket->shutdown_handle);
2800
2801   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2802   handle->socket = socket;
2803   handle->completion_cb = completion_cb;
2804   handle->completion_cls = completion_cls;
2805   socket->shutdown_handle = handle;
2806
2807   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2808   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2809   switch (operation)
2810   {
2811   case SHUT_RD:
2812     handle->operation = SHUT_RD;
2813     if (NULL != socket->read_handle)
2814       LOG (GNUNET_ERROR_TYPE_WARNING,
2815            "Existing read handle should be cancelled before shutting"
2816            " down reading\n");
2817     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2818     queue_message (socket,
2819                    msg,
2820                    &set_state_receive_close_wait,
2821                    NULL);
2822     break;
2823   case SHUT_WR:
2824     handle->operation = SHUT_WR;
2825     if (NULL != socket->write_handle)
2826       LOG (GNUNET_ERROR_TYPE_WARNING,
2827            "Existing write handle should be cancelled before shutting"
2828            " down writing\n");
2829     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2830     queue_message (socket,
2831                    msg,
2832                    &set_state_transmit_close_wait,
2833                    NULL);
2834     break;
2835   case SHUT_RDWR:
2836     handle->operation = SHUT_RDWR;
2837     if (NULL != socket->write_handle)
2838       LOG (GNUNET_ERROR_TYPE_WARNING,
2839            "Existing write handle should be cancelled before shutting"
2840            " down writing\n");
2841     if (NULL != socket->read_handle)
2842       LOG (GNUNET_ERROR_TYPE_WARNING,
2843            "Existing read handle should be cancelled before shutting"
2844            " down reading\n");
2845     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2846     queue_message (socket,
2847                    msg,
2848                    &set_state_close_wait,
2849                    NULL);
2850     break;
2851   default:
2852     LOG (GNUNET_ERROR_TYPE_WARNING,
2853          "GNUNET_STREAM_shutdown called with invalid value for "
2854          "parameter operation -- Ignoring\n");
2855     GNUNET_free (msg);
2856     GNUNET_free (handle);
2857     return NULL;
2858   }
2859   handle->close_msg_retransmission_task_id =
2860     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2861                                   &close_msg_retransmission_task,
2862                                   handle);
2863   return handle;
2864 }
2865
2866
2867 /**
2868  * Cancels a pending shutdown
2869  *
2870  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
2871  */
2872 void
2873 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
2874 {
2875   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
2876     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
2877   GNUNET_free (handle);
2878   return;
2879 }
2880
2881
2882 /**
2883  * Closes the stream
2884  *
2885  * @param socket the stream socket
2886  */
2887 void
2888 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2889 {
2890   struct MessageQueue *head;
2891
2892   GNUNET_break (NULL == socket->read_handle);
2893   GNUNET_break (NULL == socket->write_handle);
2894
2895   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2896   {
2897     /* socket closed with read task pending!? */
2898     GNUNET_break (0);
2899     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2900     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2901   }
2902   
2903   /* Terminate the ack'ing tasks if they are still present */
2904   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2905   {
2906     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2907     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2908   }
2909
2910   /* Clear Transmit handles */
2911   if (NULL != socket->transmit_handle)
2912   {
2913     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2914     socket->transmit_handle = NULL;
2915   }
2916   if (NULL != socket->ack_transmit_handle)
2917   {
2918     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2919     GNUNET_free (socket->ack_msg);
2920     socket->ack_msg = NULL;
2921     socket->ack_transmit_handle = NULL;
2922   }
2923
2924   /* Clear existing message queue */
2925   while (NULL != (head = socket->queue_head)) {
2926     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2927                                  socket->queue_tail,
2928                                  head);
2929     GNUNET_free (head->message);
2930     GNUNET_free (head);
2931   }
2932
2933   /* Close associated tunnel */
2934   if (NULL != socket->tunnel)
2935   {
2936     GNUNET_MESH_tunnel_destroy (socket->tunnel);
2937     socket->tunnel = NULL;
2938   }
2939
2940   /* Close mesh connection */
2941   if (NULL != socket->mesh && NULL == socket->lsocket)
2942   {
2943     GNUNET_MESH_disconnect (socket->mesh);
2944     socket->mesh = NULL;
2945   }
2946   
2947   /* Release receive buffer */
2948   if (NULL != socket->receive_buffer)
2949   {
2950     GNUNET_free (socket->receive_buffer);
2951   }
2952
2953   GNUNET_free (socket);
2954 }
2955
2956
2957 /**
2958  * Listens for stream connections for a specific application ports
2959  *
2960  * @param cfg the configuration to use
2961  * @param app_port the application port for which new streams will be accepted
2962  * @param listen_cb this function will be called when a peer tries to establish
2963  *            a stream with us
2964  * @param listen_cb_cls closure for listen_cb
2965  * @return listen socket, NULL for any error
2966  */
2967 struct GNUNET_STREAM_ListenSocket *
2968 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2969                       GNUNET_MESH_ApplicationType app_port,
2970                       GNUNET_STREAM_ListenCallback listen_cb,
2971                       void *listen_cb_cls)
2972 {
2973   /* FIXME: Add variable args for passing configration options? */
2974   struct GNUNET_STREAM_ListenSocket *lsocket;
2975   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2976
2977   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2978   lsocket->port = app_port;
2979   lsocket->listen_cb = listen_cb;
2980   lsocket->listen_cb_cls = listen_cb_cls;
2981   lsocket->mesh = GNUNET_MESH_connect (cfg,
2982                                        10, /* FIXME: QUEUE size as parameter? */
2983                                        lsocket, /* Closure */
2984                                        &new_tunnel_notify,
2985                                        &tunnel_cleaner,
2986                                        server_message_handlers,
2987                                        ports);
2988   GNUNET_assert (NULL != lsocket->mesh);
2989   return lsocket;
2990 }
2991
2992
2993 /**
2994  * Closes the listen socket
2995  *
2996  * @param lsocket the listen socket
2997  */
2998 void
2999 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3000 {
3001   /* Close MESH connection */
3002   GNUNET_assert (NULL != lsocket->mesh);
3003   GNUNET_MESH_disconnect (lsocket->mesh);
3004   
3005   GNUNET_free (lsocket);
3006 }
3007
3008
3009 /**
3010  * Tries to write the given data to the stream. The maximum size of data that
3011  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3012  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3013  * violation, however only the said number of maximum bytes will be written.
3014  *
3015  * @param socket the socket representing a stream
3016  * @param data the data buffer from where the data is written into the stream
3017  * @param size the number of bytes to be written from the data buffer
3018  * @param timeout the timeout period
3019  * @param write_cont the function to call upon writing some bytes into the
3020  *          stream 
3021  * @param write_cont_cls the closure
3022  *
3023  * @return handle to cancel the operation; if a previous write is pending or
3024  *           the stream has been shutdown for this operation then write_cont is
3025  *           immediately called and NULL is returned.
3026  */
3027 struct GNUNET_STREAM_IOWriteHandle *
3028 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3029                      const void *data,
3030                      size_t size,
3031                      struct GNUNET_TIME_Relative timeout,
3032                      GNUNET_STREAM_CompletionContinuation write_cont,
3033                      void *write_cont_cls)
3034 {
3035   unsigned int num_needed_packets;
3036   unsigned int packet;
3037   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3038   uint32_t packet_size;
3039   uint32_t payload_size;
3040   struct GNUNET_STREAM_DataMessage *data_msg;
3041   const void *sweep;
3042   struct GNUNET_TIME_Relative ack_deadline;
3043
3044   LOG (GNUNET_ERROR_TYPE_DEBUG,
3045        "%s\n", __func__);
3046
3047   /* Return NULL if there is already a write request pending */
3048   if (NULL != socket->write_handle)
3049   {
3050     GNUNET_break (0);
3051     return NULL;
3052   }
3053
3054   switch (socket->state)
3055   {
3056   case STATE_TRANSMIT_CLOSED:
3057   case STATE_TRANSMIT_CLOSE_WAIT:
3058   case STATE_CLOSED:
3059   case STATE_CLOSE_WAIT:
3060     if (NULL != write_cont)
3061       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3062     LOG (GNUNET_ERROR_TYPE_DEBUG,
3063          "%s() END\n", __func__);
3064     return NULL;
3065   case STATE_INIT:
3066   case STATE_LISTEN:
3067   case STATE_HELLO_WAIT:
3068     if (NULL != write_cont)
3069       /* FIXME: GNUNET_STREAM_SYSERR?? */
3070       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3071     LOG (GNUNET_ERROR_TYPE_DEBUG,
3072          "%s() END\n", __func__);
3073     return NULL;
3074   case STATE_ESTABLISHED:
3075   case STATE_RECEIVE_CLOSED:
3076   case STATE_RECEIVE_CLOSE_WAIT:
3077     break;
3078   }
3079
3080   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3081     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3082   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3083   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3084   io_handle->socket = socket;
3085   io_handle->write_cont = write_cont;
3086   io_handle->write_cont_cls = write_cont_cls;
3087   io_handle->size = size;
3088   sweep = data;
3089   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3090      determined from RTT */
3091   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3092   /* Divide the given buffer into packets for sending */
3093   for (packet=0; packet < num_needed_packets; packet++)
3094   {
3095     if ((packet + 1) * max_payload_size < size) 
3096     {
3097       payload_size = max_payload_size;
3098       packet_size = MAX_PACKET_SIZE;
3099     }
3100     else 
3101     {
3102       payload_size = size - packet * max_payload_size;
3103       packet_size =  payload_size + sizeof (struct
3104                                             GNUNET_STREAM_DataMessage); 
3105     }
3106     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3107     io_handle->messages[packet]->header.header.size = htons (packet_size);
3108     io_handle->messages[packet]->header.header.type =
3109       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3110     io_handle->messages[packet]->sequence_number =
3111       htonl (socket->write_sequence_number++);
3112     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3113
3114     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3115        determined from RTT */
3116     io_handle->messages[packet]->ack_deadline =
3117       GNUNET_TIME_relative_hton (ack_deadline);
3118     data_msg = io_handle->messages[packet];
3119     /* Copy data from given buffer to the packet */
3120     memcpy (&data_msg[1],
3121             sweep,
3122             payload_size);
3123     sweep += payload_size;
3124     socket->write_offset += payload_size;
3125   }
3126   socket->write_handle = io_handle;
3127   write_data (socket);
3128
3129   LOG (GNUNET_ERROR_TYPE_DEBUG,
3130        "%s() END\n", __func__);
3131
3132   return io_handle;
3133 }
3134
3135
3136
3137 /**
3138  * Tries to read data from the stream.
3139  *
3140  * @param socket the socket representing a stream
3141  * @param timeout the timeout period
3142  * @param proc function to call with data (once only)
3143  * @param proc_cls the closure for proc
3144  *
3145  * @return handle to cancel the operation; if the stream has been shutdown for
3146  *           this type of opeartion then the DataProcessor is immediately
3147  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3148  */
3149 struct GNUNET_STREAM_IOReadHandle *
3150 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3151                     struct GNUNET_TIME_Relative timeout,
3152                     GNUNET_STREAM_DataProcessor proc,
3153                     void *proc_cls)
3154 {
3155   struct GNUNET_STREAM_IOReadHandle *read_handle;
3156   
3157   LOG (GNUNET_ERROR_TYPE_DEBUG,
3158        "%s()\n", 
3159        __func__);
3160
3161   /* Return NULL if there is already a read handle; the user has to cancel that
3162      first before continuing or has to wait until it is completed */
3163   if (NULL != socket->read_handle) return NULL;
3164
3165   GNUNET_assert (NULL != proc);
3166
3167   switch (socket->state)
3168   {
3169   case STATE_RECEIVE_CLOSED:
3170   case STATE_RECEIVE_CLOSE_WAIT:
3171   case STATE_CLOSED:
3172   case STATE_CLOSE_WAIT:
3173     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3174     LOG (GNUNET_ERROR_TYPE_DEBUG,
3175          "%s() END\n",
3176          __func__);
3177     return NULL;
3178   default:
3179     break;
3180   }
3181
3182   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3183   read_handle->proc = proc;
3184   read_handle->proc_cls = proc_cls;
3185   socket->read_handle = read_handle;
3186
3187   /* Check if we have a packet at bitmap 0 */
3188   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3189                                           0))
3190   {
3191     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3192                                                      socket);
3193    
3194   }
3195   
3196   /* Setup the read timeout task */
3197   socket->read_io_timeout_task_id =
3198     GNUNET_SCHEDULER_add_delayed (timeout,
3199                                   &read_io_timeout,
3200                                   socket);
3201   LOG (GNUNET_ERROR_TYPE_DEBUG,
3202        "%s() END\n",
3203        __func__);
3204   return read_handle;
3205 }
3206
3207
3208 /**
3209  * Cancel pending write operation.
3210  *
3211  * @param ioh handle to operation to cancel
3212  */
3213 void
3214 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3215 {
3216   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3217   unsigned int packet;
3218
3219   GNUNET_assert (NULL != socket->write_handle);
3220   GNUNET_assert (socket->write_handle == ioh);
3221
3222   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3223   {
3224     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3225     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3226   }
3227
3228   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3229   {
3230     if (NULL == ioh->messages[packet]) break;
3231     GNUNET_free (ioh->messages[packet]);
3232   }
3233       
3234   GNUNET_free (socket->write_handle);
3235   socket->write_handle = NULL;
3236   return;
3237 }
3238
3239
3240 /**
3241  * Cancel pending read operation.
3242  *
3243  * @param ioh handle to operation to cancel
3244  */
3245 void
3246 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3247 {
3248   return;
3249 }