49227ccec67e7d98812190fa31f2f36791846b49
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define MAX_PACKET_SIZE 512//64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * The maximum payload a data message packet can carry
75  */
76 static const size_t max_payload_size = 
77   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
78
79 /**
80  * states in the Protocol
81  */
82 enum State
83   {
84     /**
85      * Client initialization state
86      */
87     STATE_INIT,
88
89     /**
90      * Listener initialization state 
91      */
92     STATE_LISTEN,
93
94     /**
95      * Pre-connection establishment state
96      */
97     STATE_HELLO_WAIT,
98
99     /**
100      * State where a connection has been established
101      */
102     STATE_ESTABLISHED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_RECEIVE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed for reading
111      */
112     STATE_RECEIVE_CLOSED,
113
114     /**
115      * State where the socket is closed on our side and waiting to be ACK'ed
116      */
117     STATE_TRANSMIT_CLOSE_WAIT,
118
119     /**
120      * State where the socket is closed for writing
121      */
122     STATE_TRANSMIT_CLOSED,
123
124     /**
125      * State where the socket is closed on our side and waiting to be ACK'ed
126      */
127     STATE_CLOSE_WAIT,
128
129     /**
130      * State where the socket is closed
131      */
132     STATE_CLOSED 
133   };
134
135
136 /**
137  * Functions of this type are called when a message is written
138  *
139  * @param cls the closure from queue_message
140  * @param socket the socket the written message was bound to
141  */
142 typedef void (*SendFinishCallback) (void *cls,
143                                     struct GNUNET_STREAM_Socket *socket);
144
145
146 /**
147  * The send message queue
148  */
149 struct MessageQueue
150 {
151   /**
152    * The message
153    */
154   struct GNUNET_STREAM_MessageHeader *message;
155
156   /**
157    * Callback to be called when the message is sent
158    */
159   SendFinishCallback finish_cb;
160
161   /**
162    * The closure for finish_cb
163    */
164   void *finish_cb_cls;
165
166   /**
167    * The next message in queue. Should be NULL in the last message
168    */
169   struct MessageQueue *next;
170
171   /**
172    * The next message in queue. Should be NULL in the first message
173    */
174   struct MessageQueue *prev;
175 };
176
177
178 /**
179  * The STREAM Socket Handler
180  */
181 struct GNUNET_STREAM_Socket
182 {
183   /**
184    * Retransmission timeout
185    */
186   struct GNUNET_TIME_Relative retransmit_timeout;
187
188   /**
189    * The Acknowledgement Bitmap
190    */
191   GNUNET_STREAM_AckBitmap ack_bitmap;
192
193   /**
194    * Time when the Acknowledgement was queued
195    */
196   struct GNUNET_TIME_Absolute ack_time_registered;
197
198   /**
199    * Queued Acknowledgement deadline
200    */
201   struct GNUNET_TIME_Relative ack_time_deadline;
202
203   /**
204    * The mesh handle
205    */
206   struct GNUNET_MESH_Handle *mesh;
207
208   /**
209    * The mesh tunnel handle
210    */
211   struct GNUNET_MESH_Tunnel *tunnel;
212
213   /**
214    * Stream open closure
215    */
216   void *open_cls;
217
218   /**
219    * Stream open callback
220    */
221   GNUNET_STREAM_OpenCallback open_cb;
222
223   /**
224    * The current transmit handle (if a pending transmit request exists)
225    */
226   struct GNUNET_MESH_TransmitHandle *transmit_handle;
227
228   /**
229    * The current act transmit handle (if a pending ack transmit request exists)
230    */
231   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
232
233   /**
234    * Pointer to the current ack message using in ack_task
235    */
236   struct GNUNET_STREAM_AckMessage *ack_msg;
237
238   /**
239    * The current message associated with the transmit handle
240    */
241   struct MessageQueue *queue_head;
242
243   /**
244    * The queue tail, should always point to the last message in queue
245    */
246   struct MessageQueue *queue_tail;
247
248   /**
249    * The write IO_handle associated with this socket
250    */
251   struct GNUNET_STREAM_IOWriteHandle *write_handle;
252
253   /**
254    * The read IO_handle associated with this socket
255    */
256   struct GNUNET_STREAM_IOReadHandle *read_handle;
257
258   /**
259    * The shutdown handle associated with this socket
260    */
261   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
262
263   /**
264    * Buffer for storing received messages
265    */
266   void *receive_buffer;
267
268   /**
269    * The listen socket from which this socket is derived. Should be NULL if it
270    * is not a derived socket
271    */
272   struct GNUNET_STREAM_ListenSocket *lsocket;
273
274   /**
275    * The peer identity of the peer at the other end of the stream
276    */
277   struct GNUNET_PeerIdentity other_peer;
278
279   /**
280    * Task identifier for the read io timeout task
281    */
282   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
283
284   /**
285    * Task identifier for retransmission task after timeout
286    */
287   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
288
289   /**
290    * Task identifier for retransmission of control messages
291    */
292   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
293
294   /**
295    * The task for sending timely Acks
296    */
297   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
298
299   /**
300    * Task scheduled to continue a read operation.
301    */
302   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
303
304   /**
305    * The state of the protocol associated with this socket
306    */
307   enum State state;
308
309   /**
310    * The status of the socket
311    */
312   enum GNUNET_STREAM_Status status;
313
314   /**
315    * The number of previous timeouts; FIXME: currently not used
316    */
317   unsigned int retries;
318
319   /**
320    * The application port number (type: uint32_t)
321    */
322   GNUNET_MESH_ApplicationType app_port;
323
324   /**
325    * Whether testing mode is active or not
326    */
327   int testing_active;
328
329   /**
330    * The write sequence number to be set incase of testing
331    */
332   uint32_t testing_set_write_sequence_number_value;
333
334   /**
335    * The session id associated with this stream connection
336    * FIXME: Not used currently, may be removed
337    */
338   uint32_t session_id;
339
340   /**
341    * Write sequence number. Set to random when sending HELLO(client) and
342    * HELLO_ACK(server) 
343    */
344   uint32_t write_sequence_number;
345
346   /**
347    * Read sequence number. This number's value is determined during handshake
348    */
349   uint32_t read_sequence_number;
350
351   /**
352    * The receiver buffer size
353    */
354   uint32_t receive_buffer_size;
355
356   /**
357    * The receiver buffer boundaries
358    */
359   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
360
361   /**
362    * receiver's available buffer after the last acknowledged packet
363    */
364   uint32_t receiver_window_available;
365
366   /**
367    * The offset pointer used during write operation
368    */
369   uint32_t write_offset;
370
371   /**
372    * The offset after which we are expecting data
373    */
374   uint32_t read_offset;
375
376   /**
377    * The offset upto which user has read from the received buffer
378    */
379   uint32_t copy_offset;
380 };
381
382
383 /**
384  * A socket for listening
385  */
386 struct GNUNET_STREAM_ListenSocket
387 {
388   /**
389    * The mesh handle
390    */
391   struct GNUNET_MESH_Handle *mesh;
392
393   /**
394    * Our configuration
395    */
396   struct GNUNET_CONFIGURATION_Handle *cfg;
397
398   /**
399    * Handle to the lock manager service
400    */
401   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
402
403   /**
404    * The active LockingRequest from lockmanager
405    */
406   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
407
408   /**
409    * Callback to call after acquring a lock and listening
410    */
411   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
412
413   /**
414    * The callback function which is called after successful opening socket
415    */
416   GNUNET_STREAM_ListenCallback listen_cb;
417
418   /**
419    * The call back closure
420    */
421   void *listen_cb_cls;
422
423   /**
424    * The service port
425    */
426   GNUNET_MESH_ApplicationType port;
427   
428   /**
429    * The id of the lockmanager timeout task
430    */
431   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
432
433   /**
434    * The retransmit timeout
435    */
436   struct GNUNET_TIME_Relative retransmit_timeout;
437   
438   /**
439    * Listen enabled?
440    */
441   int listening;
442
443   /**
444    * Whether testing mode is active or not
445    */
446   int testing_active;
447
448   /**
449    * The write sequence number to be set incase of testing
450    */
451   uint32_t testing_set_write_sequence_number_value;
452 };
453
454
455 /**
456  * The IO Write Handle
457  */
458 struct GNUNET_STREAM_IOWriteHandle
459 {
460   /**
461    * The socket to which this write handle is associated
462    */
463   struct GNUNET_STREAM_Socket *socket;
464
465   /**
466    * The packet_buffers associated with this Handle
467    */
468   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
469
470   /**
471    * The write continuation callback
472    */
473   GNUNET_STREAM_CompletionContinuation write_cont;
474
475   /**
476    * Write continuation closure
477    */
478   void *write_cont_cls;
479
480   /**
481    * The bitmap of this IOHandle; Corresponding bit for a message is set when
482    * it has been acknowledged by the receiver
483    */
484   GNUNET_STREAM_AckBitmap ack_bitmap;
485
486   /**
487    * Number of bytes in this write handle
488    */
489   size_t size;
490 };
491
492
493 /**
494  * The IO Read Handle
495  */
496 struct GNUNET_STREAM_IOReadHandle
497 {
498   /**
499    * The socket to which this read handle is associated
500    */
501   struct GNUNET_STREAM_Socket *socket;
502   
503   /**
504    * Callback for the read processor
505    */
506   GNUNET_STREAM_DataProcessor proc;
507
508   /**
509    * The closure pointer for the read processor callback
510    */
511   void *proc_cls;
512 };
513
514
515 /**
516  * Handle for Shutdown
517  */
518 struct GNUNET_STREAM_ShutdownHandle
519 {
520   /**
521    * The socket associated with this shutdown handle
522    */
523   struct GNUNET_STREAM_Socket *socket;
524
525   /**
526    * Shutdown completion callback
527    */
528   GNUNET_STREAM_ShutdownCompletion completion_cb;
529
530   /**
531    * Closure for completion callback
532    */
533   void *completion_cls;
534
535   /**
536    * Close message retransmission task id
537    */
538   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
539
540   /**
541    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
542    */
543   int operation;  
544 };
545
546
547 /**
548  * Default value in seconds for various timeouts
549  */
550 static const unsigned int default_timeout = 10;
551
552 /**
553  * The domain name for locks we use here
554  */
555 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
556
557
558 /**
559  * Callback function for sending queued message
560  *
561  * @param cls closure the socket
562  * @param size number of bytes available in buf
563  * @param buf where the callee should write the message
564  * @return number of bytes written to buf
565  */
566 static size_t
567 send_message_notify (void *cls, size_t size, void *buf)
568 {
569   struct GNUNET_STREAM_Socket *socket = cls;
570   struct MessageQueue *head;
571   size_t ret;
572
573   socket->transmit_handle = NULL; /* Remove the transmit handle */
574   head = socket->queue_head;
575   if (NULL == head)
576     return 0; /* just to be safe */
577   if (0 == size)                /* request timed out */
578   {
579     socket->retries++;
580     LOG (GNUNET_ERROR_TYPE_DEBUG,
581          "%s: Message sending timed out. Retry %d \n",
582          GNUNET_i2s (&socket->other_peer),
583          socket->retries);
584     socket->transmit_handle = 
585       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
586                                          GNUNET_NO, /* Corking */
587                                          /* FIXME: exponential backoff */
588                                          socket->retransmit_timeout,
589                                          &socket->other_peer,
590                                          ntohs (head->message->header.size),
591                                          &send_message_notify,
592                                          socket);
593     return 0;
594   }
595   ret = ntohs (head->message->header.size);
596   GNUNET_assert (size >= ret);
597   memcpy (buf, head->message, ret);
598   if (NULL != head->finish_cb)
599   {
600     head->finish_cb (head->finish_cb_cls, socket);
601   }
602   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
603                                socket->queue_tail,
604                                head);
605   GNUNET_free (head->message);
606   GNUNET_free (head);
607   head = socket->queue_head;
608   if (NULL != head)    /* more pending messages to send */
609   {
610     socket->retries = 0;
611     socket->transmit_handle = 
612       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
613                                          GNUNET_NO, /* Corking */
614                                          /* FIXME: exponential backoff */
615                                          socket->retransmit_timeout,
616                                          &socket->other_peer,
617                                          ntohs (head->message->header.size),
618                                          &send_message_notify,
619                                          socket);
620   }
621   return ret;
622 }
623
624
625 /**
626  * Queues a message for sending using the mesh connection of a socket
627  *
628  * @param socket the socket whose mesh connection is used
629  * @param message the message to be sent
630  * @param finish_cb the callback to be called when the message is sent
631  * @param finish_cb_cls the closure for the callback
632  */
633 static void
634 queue_message (struct GNUNET_STREAM_Socket *socket,
635                struct GNUNET_STREAM_MessageHeader *message,
636                SendFinishCallback finish_cb,
637                void *finish_cb_cls)
638 {
639   struct MessageQueue *queue_entity;
640
641   GNUNET_assert 
642     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
643      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
644
645   LOG (GNUNET_ERROR_TYPE_DEBUG,
646        "%s: Queueing message of type %d and size %d\n",
647        GNUNET_i2s (&socket->other_peer),
648        ntohs (message->header.type),
649        ntohs (message->header.size));
650   GNUNET_assert (NULL != message);
651   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
652   queue_entity->message = message;
653   queue_entity->finish_cb = finish_cb;
654   queue_entity->finish_cb_cls = finish_cb_cls;
655   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
656                                     socket->queue_tail,
657                                     queue_entity);
658   if (NULL == socket->transmit_handle)
659   {
660     socket->retries = 0;
661     socket->transmit_handle = 
662       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
663                                          GNUNET_NO, /* Corking */
664                                          socket->retransmit_timeout,
665                                          &socket->other_peer,
666                                          ntohs (message->header.size),
667                                          &send_message_notify,
668                                          socket);
669   }
670 }
671
672
673 /**
674  * Copies a message and queues it for sending using the mesh connection of
675  * given socket 
676  *
677  * @param socket the socket whose mesh connection is used
678  * @param message the message to be sent
679  * @param finish_cb the callback to be called when the message is sent
680  * @param finish_cb_cls the closure for the callback
681  */
682 static void
683 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
684                         const struct GNUNET_STREAM_MessageHeader *message,
685                         SendFinishCallback finish_cb,
686                         void *finish_cb_cls)
687 {
688   struct GNUNET_STREAM_MessageHeader *msg_copy;
689   uint16_t size;
690   
691   size = ntohs (message->header.size);
692   msg_copy = GNUNET_malloc (size);
693   memcpy (msg_copy, message, size);
694   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
695 }
696
697
698 /**
699  * Callback function for sending ack message
700  *
701  * @param cls closure the ACK message created in ack_task
702  * @param size number of bytes available in buffer
703  * @param buf where the callee should write the message
704  * @return number of bytes written to buf
705  */
706 static size_t
707 send_ack_notify (void *cls, size_t size, void *buf)
708 {
709   struct GNUNET_STREAM_Socket *socket = cls;
710
711   if (0 == size)
712   {
713     LOG (GNUNET_ERROR_TYPE_DEBUG,
714          "%s called with size 0\n", __func__);
715     return 0;
716   }
717   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
718   
719   size = ntohs (socket->ack_msg->header.header.size);
720   memcpy (buf, socket->ack_msg, size);
721   
722   GNUNET_free (socket->ack_msg);
723   socket->ack_msg = NULL;
724   socket->ack_transmit_handle = NULL;
725   return size;
726 }
727
728
729 /**
730  * Writes data using the given socket. The amount of data written is limited by
731  * the receiver_window_size
732  *
733  * @param socket the socket to use
734  */
735 static void 
736 write_data (struct GNUNET_STREAM_Socket *socket);
737
738
739 /**
740  * Task for retransmitting data messages if they aren't ACK before their ack
741  * deadline 
742  *
743  * @param cls the socket
744  * @param tc the Task context
745  */
746 static void
747 data_retransmission_task (void *cls,
748                           const struct GNUNET_SCHEDULER_TaskContext *tc)
749 {
750   struct GNUNET_STREAM_Socket *socket = cls;
751   
752   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
753     return;
754   LOG (GNUNET_ERROR_TYPE_DEBUG,
755        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
756   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
757   write_data (socket);
758 }
759
760
761 /**
762  * Task for sending ACK message
763  *
764  * @param cls the socket
765  * @param tc the Task context
766  */
767 static void
768 ack_task (void *cls,
769           const struct GNUNET_SCHEDULER_TaskContext *tc)
770 {
771   struct GNUNET_STREAM_Socket *socket = cls;
772   struct GNUNET_STREAM_AckMessage *ack_msg;
773
774   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
775   {
776     return;
777   }
778   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
779   /* Create the ACK Message */
780   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
781   ack_msg->header.header.size = htons (sizeof (struct 
782                                                GNUNET_STREAM_AckMessage));
783   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
784   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
785   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
786   ack_msg->receive_window_remaining = 
787     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
788   socket->ack_msg = ack_msg;
789   /* Request MESH for sending ACK */
790   socket->ack_transmit_handle = 
791     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
792                                        GNUNET_NO, /* Corking */
793                                        socket->retransmit_timeout,
794                                        &socket->other_peer,
795                                        ntohs (ack_msg->header.header.size),
796                                        &send_ack_notify,
797                                        socket);
798 }
799
800
801 /**
802  * Retransmission task for shutdown messages
803  *
804  * @param cls the shutdown handle
805  * @param tc the Task Context
806  */
807 static void
808 close_msg_retransmission_task (void *cls,
809                                const struct GNUNET_SCHEDULER_TaskContext *tc)
810 {
811   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
812   struct GNUNET_STREAM_MessageHeader *msg;
813   struct GNUNET_STREAM_Socket *socket;
814
815   GNUNET_assert (NULL != shutdown_handle);
816   socket = shutdown_handle->socket;
817
818   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
819   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
820   switch (shutdown_handle->operation)
821   {
822   case SHUT_RDWR:
823     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
824     break;
825   case SHUT_RD:
826     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
827     break;
828   case SHUT_WR:
829     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
830     break;
831   default:
832     GNUNET_free (msg);
833     shutdown_handle->close_msg_retransmission_task_id = 
834       GNUNET_SCHEDULER_NO_TASK;
835     return;
836   }
837   queue_message (socket, msg, NULL, NULL);
838   shutdown_handle->close_msg_retransmission_task_id =
839     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
840                                   &close_msg_retransmission_task,
841                                   shutdown_handle);
842 }
843
844
845 /**
846  * Function to modify a bit in GNUNET_STREAM_AckBitmap
847  *
848  * @param bitmap the bitmap to modify
849  * @param bit the bit number to modify
850  * @param value GNUNET_YES to on, GNUNET_NO to off
851  */
852 static void
853 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
854                       unsigned int bit, 
855                       int value)
856 {
857   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
858   if (GNUNET_YES == value)
859     *bitmap |= (1LL << bit);
860   else
861     *bitmap &= ~(1LL << bit);
862 }
863
864
865 /**
866  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
867  *
868  * @param bitmap address of the bitmap that has to be checked
869  * @param bit the bit number to check
870  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
871  */
872 static uint8_t
873 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
874                       unsigned int bit)
875 {
876   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
877   return 0 != (*bitmap & (1LL << bit));
878 }
879
880
881 /**
882  * Writes data using the given socket. The amount of data written is limited by
883  * the receiver_window_size
884  *
885  * @param socket the socket to use
886  */
887 static void 
888 write_data (struct GNUNET_STREAM_Socket *socket)
889 {
890   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
891   int packet;                   /* Although an int, should never be negative */
892   int ack_packet;
893
894   ack_packet = -1;
895   /* Find the last acknowledged packet */
896   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
897   {      
898     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
899                                             packet))
900       ack_packet = packet;        
901     else if (NULL == io_handle->messages[packet])
902       break;
903   }
904   /* Resend packets which weren't ack'ed */
905   for (packet=0; packet < ack_packet; packet++)
906   {
907     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
908                                            packet))
909     {
910       LOG (GNUNET_ERROR_TYPE_DEBUG,
911            "%s: Placing DATA message with sequence %u in send queue\n",
912            GNUNET_i2s (&socket->other_peer),
913            ntohl (io_handle->messages[packet]->sequence_number));
914       copy_and_queue_message (socket,
915                               &io_handle->messages[packet]->header,
916                               NULL,
917                               NULL);
918     }
919   }
920   packet = ack_packet + 1;
921   /* Now send new packets if there is enough buffer space */
922   while ( (NULL != io_handle->messages[packet]) &&
923           (socket->receiver_window_available 
924            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
925           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
926   {
927     socket->receiver_window_available -= 
928       ntohs (io_handle->messages[packet]->header.header.size);
929     LOG (GNUNET_ERROR_TYPE_DEBUG,
930          "%s: Placing DATA message with sequence %u in send queue\n",
931          GNUNET_i2s (&socket->other_peer),
932          ntohl (io_handle->messages[packet]->sequence_number));
933     copy_and_queue_message (socket,
934                             &io_handle->messages[packet]->header,
935                             NULL,
936                             NULL);
937     packet++;
938   }
939   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
940     socket->data_retransmission_task_id = 
941       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
942                                     (GNUNET_TIME_UNIT_SECONDS, 8),
943                                     &data_retransmission_task,
944                                     socket);
945 }
946
947
948 /**
949  * Task for calling the read processor
950  *
951  * @param cls the socket
952  * @param tc the task context
953  */
954 static void
955 call_read_processor (void *cls,
956                      const struct GNUNET_SCHEDULER_TaskContext *tc)
957 {
958   struct GNUNET_STREAM_Socket *socket = cls;
959   size_t read_size;
960   size_t valid_read_size;
961   unsigned int packet;
962   uint32_t sequence_increase;
963   uint32_t offset_increase;
964
965   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
966   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
967     return;
968
969   if (NULL == socket->receive_buffer) 
970     return;
971
972   GNUNET_assert (NULL != socket->read_handle);
973   GNUNET_assert (NULL != socket->read_handle->proc);
974
975   /* Check the bitmap for any holes */
976   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
977   {
978     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
979                                            packet))
980       break;
981   }
982   /* We only call read processor if we have the first packet */
983   GNUNET_assert (0 < packet);
984   valid_read_size = 
985     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
986   GNUNET_assert (0 != valid_read_size);
987   /* Cancel the read_io_timeout_task */
988   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
989   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
990   /* Call the data processor */
991   LOG (GNUNET_ERROR_TYPE_DEBUG,
992        "%s: Calling read processor\n",
993        GNUNET_i2s (&socket->other_peer));
994   read_size = 
995     socket->read_handle->proc (socket->read_handle->proc_cls,
996                                socket->status,
997                                socket->receive_buffer + socket->copy_offset,
998                                valid_read_size);
999   LOG (GNUNET_ERROR_TYPE_DEBUG,
1000        "%s: Read processor read %d bytes\n",
1001        GNUNET_i2s (&socket->other_peer), read_size);
1002   LOG (GNUNET_ERROR_TYPE_DEBUG,
1003        "%s: Read processor completed successfully\n",
1004        GNUNET_i2s (&socket->other_peer));
1005   /* Free the read handle */
1006   GNUNET_free (socket->read_handle);
1007   socket->read_handle = NULL;
1008   GNUNET_assert (read_size <= valid_read_size);
1009   socket->copy_offset += read_size;
1010   /* Determine upto which packet we can remove from the buffer */
1011   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1012   {
1013     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1014     { packet++; break; }
1015     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1016       break;
1017   }
1018
1019   /* If no packets can be removed we can't move the buffer */
1020   if (0 == packet) return;
1021   sequence_increase = packet;
1022   LOG (GNUNET_ERROR_TYPE_DEBUG,
1023        "%s: Sequence increase after read processor completion: %u\n",
1024        GNUNET_i2s (&socket->other_peer), sequence_increase);
1025
1026   /* Shift the data in the receive buffer */
1027   socket->receive_buffer = 
1028     memmove (socket->receive_buffer,
1029              socket->receive_buffer 
1030              + socket->receive_buffer_boundaries[sequence_increase-1],
1031              socket->receive_buffer_size
1032              - socket->receive_buffer_boundaries[sequence_increase-1]);
1033   /* Shift the bitmap */
1034   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1035   /* Set read_sequence_number */
1036   socket->read_sequence_number += sequence_increase;
1037   /* Set read_offset */
1038   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1039   socket->read_offset += offset_increase;
1040   /* Fix copy_offset */
1041   GNUNET_assert (offset_increase <= socket->copy_offset);
1042   socket->copy_offset -= offset_increase;
1043   /* Fix relative boundaries */
1044   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1045   {
1046     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1047     {
1048       uint32_t ahead_buffer_boundary;
1049
1050       ahead_buffer_boundary = 
1051         socket->receive_buffer_boundaries[packet + sequence_increase];
1052       if (0 == ahead_buffer_boundary)
1053         socket->receive_buffer_boundaries[packet] = 0;
1054       else
1055       {
1056         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1057         socket->receive_buffer_boundaries[packet] = 
1058           ahead_buffer_boundary - offset_increase;
1059       }
1060     }
1061     else
1062       socket->receive_buffer_boundaries[packet] = 0;
1063   }
1064 }
1065
1066
1067 /**
1068  * Cancels the existing read io handle
1069  *
1070  * @param cls the closure from the SCHEDULER call
1071  * @param tc the task context
1072  */
1073 static void
1074 read_io_timeout (void *cls,
1075                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1076 {
1077   struct GNUNET_STREAM_Socket *socket = cls;
1078   GNUNET_STREAM_DataProcessor proc;
1079   void *proc_cls;
1080
1081   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1082   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1083   {
1084     LOG (GNUNET_ERROR_TYPE_DEBUG,
1085          "%s: Read task timedout - Cancelling it\n",
1086          GNUNET_i2s (&socket->other_peer));
1087     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1088     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1089   }
1090   GNUNET_assert (NULL != socket->read_handle);
1091   proc = socket->read_handle->proc;
1092   proc_cls = socket->read_handle->proc_cls;
1093   GNUNET_free (socket->read_handle);
1094   socket->read_handle = NULL;
1095   /* Call the read processor to signal timeout */
1096   proc (proc_cls,
1097         GNUNET_STREAM_TIMEOUT,
1098         NULL,
1099         0);
1100 }
1101
1102
1103 /**
1104  * Handler for DATA messages; Same for both client and server
1105  *
1106  * @param socket the socket through which the ack was received
1107  * @param tunnel connection to the other end
1108  * @param sender who sent the message
1109  * @param msg the data message
1110  * @param atsi performance data for the connection
1111  * @return GNUNET_OK to keep the connection open,
1112  *         GNUNET_SYSERR to close it (signal serious error)
1113  */
1114 static int
1115 handle_data (struct GNUNET_STREAM_Socket *socket,
1116              struct GNUNET_MESH_Tunnel *tunnel,
1117              const struct GNUNET_PeerIdentity *sender,
1118              const struct GNUNET_STREAM_DataMessage *msg,
1119              const struct GNUNET_ATS_Information*atsi)
1120 {
1121   const void *payload;
1122   uint32_t bytes_needed;
1123   uint32_t relative_offset;
1124   uint32_t relative_sequence_number;
1125   uint16_t size;
1126
1127   size = htons (msg->header.header.size);
1128   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1129   {
1130     GNUNET_break_op (0);
1131     return GNUNET_SYSERR;
1132   }
1133
1134   if (0 != memcmp (sender,
1135                    &socket->other_peer,
1136                    sizeof (struct GNUNET_PeerIdentity)))
1137   {
1138     LOG (GNUNET_ERROR_TYPE_DEBUG,
1139          "%s: Received DATA from non-confirming peer\n",
1140          GNUNET_i2s (&socket->other_peer));
1141     return GNUNET_YES;
1142   }
1143
1144   switch (socket->state)
1145   {
1146   case STATE_ESTABLISHED:
1147   case STATE_TRANSMIT_CLOSED:
1148   case STATE_TRANSMIT_CLOSE_WAIT:
1149       
1150     /* check if the message's sequence number is in the range we are
1151        expecting */
1152     relative_sequence_number = 
1153       ntohl (msg->sequence_number) - socket->read_sequence_number;
1154     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1155     {
1156       LOG (GNUNET_ERROR_TYPE_DEBUG,
1157            "%s: Ignoring received message with sequence number %u\n",
1158            GNUNET_i2s (&socket->other_peer),
1159            ntohl (msg->sequence_number));
1160       /* Start ACK sending task if one is not already present */
1161       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1162       {
1163         socket->ack_task_id = 
1164           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1165                                         (msg->ack_deadline),
1166                                         &ack_task,
1167                                         socket);
1168       }
1169       return GNUNET_YES;
1170     }
1171       
1172     /* Check if we have already seen this message */
1173     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1174                                             relative_sequence_number))
1175     {
1176       LOG (GNUNET_ERROR_TYPE_DEBUG,
1177            "%s: Ignoring already received message with sequence number %u\n",
1178            GNUNET_i2s (&socket->other_peer),
1179            ntohl (msg->sequence_number));
1180       /* Start ACK sending task if one is not already present */
1181       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1182       {
1183         socket->ack_task_id = 
1184           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1185                                         (msg->ack_deadline),
1186                                         &ack_task,
1187                                         socket);
1188       }
1189       return GNUNET_YES;
1190     }
1191
1192     LOG (GNUNET_ERROR_TYPE_DEBUG,
1193          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1194          GNUNET_i2s (&socket->other_peer),
1195          ntohl (msg->sequence_number),
1196          ntohs (msg->header.header.size),
1197          GNUNET_i2s (&socket->other_peer));
1198       
1199     /* Check if we have to allocate the buffer */
1200     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1201     relative_offset = ntohl (msg->offset) - socket->read_offset;
1202     bytes_needed = relative_offset + size;
1203     if (bytes_needed > socket->receive_buffer_size)
1204     {
1205       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1206       {
1207         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1208                                                  bytes_needed);
1209         socket->receive_buffer_size = bytes_needed;
1210       }
1211       else
1212       {
1213         LOG (GNUNET_ERROR_TYPE_DEBUG,
1214              "%s: Cannot accommodate packet %d as buffer is full\n",
1215              GNUNET_i2s (&socket->other_peer),
1216              ntohl (msg->sequence_number));
1217         return GNUNET_YES;
1218       }
1219     }
1220       
1221     /* Copy Data to buffer */
1222     payload = &msg[1];
1223     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1224     memcpy (socket->receive_buffer + relative_offset,
1225             payload,
1226             size);
1227     socket->receive_buffer_boundaries[relative_sequence_number] = 
1228       relative_offset + size;
1229       
1230     /* Modify the ACK bitmap */
1231     ackbitmap_modify_bit (&socket->ack_bitmap,
1232                           relative_sequence_number,
1233                           GNUNET_YES);
1234
1235     /* Start ACK sending task if one is not already present */
1236     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1237     {
1238       socket->ack_task_id = 
1239         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1240                                       (msg->ack_deadline),
1241                                       &ack_task,
1242                                       socket);
1243     }
1244
1245     if ((NULL != socket->read_handle) /* A read handle is waiting */
1246         /* There is no current read task */
1247         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1248         /* We have the first packet */
1249         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1250                                                0)))
1251     {
1252       LOG (GNUNET_ERROR_TYPE_DEBUG,
1253            "%s: Scheduling read processor\n",
1254            GNUNET_i2s (&socket->other_peer));
1255           
1256       socket->read_task_id = 
1257         GNUNET_SCHEDULER_add_now (&call_read_processor,
1258                                   socket);
1259     }
1260       
1261     break;
1262
1263   default:
1264     LOG (GNUNET_ERROR_TYPE_DEBUG,
1265          "%s: Received data message when it cannot be handled\n",
1266          GNUNET_i2s (&socket->other_peer));
1267     break;
1268   }
1269   return GNUNET_YES;
1270 }
1271
1272
1273 /**
1274  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1275  *
1276  * @param cls the socket (set from GNUNET_MESH_connect)
1277  * @param tunnel connection to the other end
1278  * @param tunnel_ctx place to store local state associated with the tunnel
1279  * @param sender who sent the message
1280  * @param message the actual message
1281  * @param atsi performance data for the connection
1282  * @return GNUNET_OK to keep the connection open,
1283  *         GNUNET_SYSERR to close it (signal serious error)
1284  */
1285 static int
1286 client_handle_data (void *cls,
1287                     struct GNUNET_MESH_Tunnel *tunnel,
1288                     void **tunnel_ctx,
1289                     const struct GNUNET_PeerIdentity *sender,
1290                     const struct GNUNET_MessageHeader *message,
1291                     const struct GNUNET_ATS_Information*atsi)
1292 {
1293   struct GNUNET_STREAM_Socket *socket = cls;
1294
1295   return handle_data (socket, 
1296                       tunnel, 
1297                       sender, 
1298                       (const struct GNUNET_STREAM_DataMessage *) message, 
1299                       atsi);
1300 }
1301
1302
1303 /**
1304  * Callback to set state to ESTABLISHED
1305  *
1306  * @param cls the closure NULL;
1307  * @param socket the socket to requiring state change
1308  */
1309 static void
1310 set_state_established (void *cls,
1311                        struct GNUNET_STREAM_Socket *socket)
1312 {
1313   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1314        "%s: Attaining ESTABLISHED state\n",
1315        GNUNET_i2s (&socket->other_peer));
1316   socket->write_offset = 0;
1317   socket->read_offset = 0;
1318   socket->state = STATE_ESTABLISHED;
1319   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1320                  socket->control_retransmission_task_id);
1321   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1322   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1323   if (NULL != socket->lsocket)
1324   {
1325     LOG (GNUNET_ERROR_TYPE_DEBUG,
1326          "%s: Calling listen callback\n",
1327          GNUNET_i2s (&socket->other_peer));
1328     if (GNUNET_SYSERR == 
1329         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1330                                     socket,
1331                                     &socket->other_peer))
1332     {
1333       socket->state = STATE_CLOSED;
1334       /* FIXME: We should close in a decent way (send RST) */
1335       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1336       GNUNET_free (socket);
1337     }
1338   }
1339   else
1340     socket->open_cb (socket->open_cls, socket);
1341 }
1342
1343
1344 /**
1345  * Callback to set state to HELLO_WAIT
1346  *
1347  * @param cls the closure from queue_message
1348  * @param socket the socket to requiring state change
1349  */
1350 static void
1351 set_state_hello_wait (void *cls,
1352                       struct GNUNET_STREAM_Socket *socket)
1353 {
1354   GNUNET_assert (STATE_INIT == socket->state);
1355   LOG (GNUNET_ERROR_TYPE_DEBUG,
1356        "%s: Attaining HELLO_WAIT state\n",
1357        GNUNET_i2s (&socket->other_peer));
1358   socket->state = STATE_HELLO_WAIT;
1359 }
1360
1361
1362 /**
1363  * Callback to set state to CLOSE_WAIT
1364  *
1365  * @param cls the closure from queue_message
1366  * @param socket the socket requiring state change
1367  */
1368 static void
1369 set_state_close_wait (void *cls,
1370                       struct GNUNET_STREAM_Socket *socket)
1371 {
1372   LOG (GNUNET_ERROR_TYPE_DEBUG,
1373        "%s: Attaing CLOSE_WAIT state\n",
1374        GNUNET_i2s (&socket->other_peer));
1375   socket->state = STATE_CLOSE_WAIT;
1376   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1377   socket->receive_buffer = NULL;
1378   socket->receive_buffer_size = 0;
1379 }
1380
1381
1382 /**
1383  * Callback to set state to RECEIVE_CLOSE_WAIT
1384  *
1385  * @param cls the closure from queue_message
1386  * @param socket the socket requiring state change
1387  */
1388 static void
1389 set_state_receive_close_wait (void *cls,
1390                               struct GNUNET_STREAM_Socket *socket)
1391 {
1392   LOG (GNUNET_ERROR_TYPE_DEBUG,
1393        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1394        GNUNET_i2s (&socket->other_peer));
1395   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1396   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1397   socket->receive_buffer = NULL;
1398   socket->receive_buffer_size = 0;
1399 }
1400
1401
1402 /**
1403  * Callback to set state to TRANSMIT_CLOSE_WAIT
1404  *
1405  * @param cls the closure from queue_message
1406  * @param socket the socket requiring state change
1407  */
1408 static void
1409 set_state_transmit_close_wait (void *cls,
1410                                struct GNUNET_STREAM_Socket *socket)
1411 {
1412   LOG (GNUNET_ERROR_TYPE_DEBUG,
1413        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1414        GNUNET_i2s (&socket->other_peer));
1415   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1416 }
1417
1418
1419 /**
1420  * Callback to set state to CLOSED
1421  *
1422  * @param cls the closure from queue_message
1423  * @param socket the socket requiring state change
1424  */
1425 static void
1426 set_state_closed (void *cls,
1427                   struct GNUNET_STREAM_Socket *socket)
1428 {
1429   socket->state = STATE_CLOSED;
1430 }
1431
1432
1433 /**
1434  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1435  *
1436  * @return the generate hello message
1437  */
1438 static struct GNUNET_STREAM_MessageHeader *
1439 generate_hello (void)
1440 {
1441   struct GNUNET_STREAM_MessageHeader *msg;
1442
1443   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1444   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1445   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1446   return msg;
1447 }
1448
1449
1450 /**
1451  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1452  * socket
1453  *
1454  * @param socket the socket for which this HelloAckMessage has to be generated
1455  * @param generate_seq GNUNET_YES to generate the write sequence number,
1456  *          GNUNET_NO to use the existing sequence number
1457  * @return the HelloAckMessage
1458  */
1459 static struct GNUNET_STREAM_HelloAckMessage *
1460 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1461                     int generate_seq)
1462 {
1463   struct GNUNET_STREAM_HelloAckMessage *msg;
1464
1465   if (GNUNET_YES == generate_seq)
1466   {
1467     if (GNUNET_YES == socket->testing_active)
1468       socket->write_sequence_number =
1469         socket->testing_set_write_sequence_number_value;
1470     else
1471       socket->write_sequence_number = 
1472         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1473     LOG_DEBUG ("%s: write sequence number %u\n",
1474                GNUNET_i2s (&socket->other_peer),
1475                (unsigned int) socket->write_sequence_number);
1476   }
1477   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1478   msg->header.header.size = 
1479     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1480   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1481   msg->sequence_number = htonl (socket->write_sequence_number);
1482   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1483   return msg;
1484 }
1485
1486
1487 /**
1488  * Task for retransmitting control messages if they aren't ACK'ed before a
1489  * deadline
1490  *
1491  * @param cls the socket
1492  * @param tc the Task context
1493  */
1494 static void
1495 control_retransmission_task (void *cls,
1496                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1497 {
1498   struct GNUNET_STREAM_Socket *socket = cls;
1499     
1500   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1501     return;
1502   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1503   LOG_DEBUG ("%s: Retransmitting a control message\n",
1504                  GNUNET_i2s (&socket->other_peer));
1505   switch (socket->status)
1506   {
1507   case STATE_INIT:    
1508     GNUNET_break (0);
1509     break;
1510   case STATE_LISTEN:
1511     GNUNET_break (0);
1512     break;
1513   case STATE_HELLO_WAIT:
1514     if (NULL == socket->lsocket) /* We are client */
1515       queue_message (socket, generate_hello (), NULL, NULL);
1516     else
1517       queue_message (socket,
1518                      (struct GNUNET_STREAM_MessageHeader *)
1519                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
1520     socket->control_retransmission_task_id =
1521     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1522                                   &control_retransmission_task, socket);
1523     break;
1524   case STATE_ESTABLISHED:
1525     if (NULL == socket->lsocket)
1526       queue_message (socket,
1527                      (struct GNUNET_STREAM_MessageHeader *)
1528                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
1529     else
1530       GNUNET_break (0);
1531   default:
1532     GNUNET_break (0);
1533   }  
1534 }
1535
1536
1537 /**
1538  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1539  *
1540  * @param cls the socket (set from GNUNET_MESH_connect)
1541  * @param tunnel connection to the other end
1542  * @param tunnel_ctx this is NULL
1543  * @param sender who sent the message
1544  * @param message the actual message
1545  * @param atsi performance data for the connection
1546  * @return GNUNET_OK to keep the connection open,
1547  *         GNUNET_SYSERR to close it (signal serious error)
1548  */
1549 static int
1550 client_handle_hello_ack (void *cls,
1551                          struct GNUNET_MESH_Tunnel *tunnel,
1552                          void **tunnel_ctx,
1553                          const struct GNUNET_PeerIdentity *sender,
1554                          const struct GNUNET_MessageHeader *message,
1555                          const struct GNUNET_ATS_Information*atsi)
1556 {
1557   struct GNUNET_STREAM_Socket *socket = cls;
1558   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1559   struct GNUNET_STREAM_HelloAckMessage *reply;
1560
1561   if (0 != memcmp (sender,
1562                    &socket->other_peer,
1563                    sizeof (struct GNUNET_PeerIdentity)))
1564   {
1565     LOG (GNUNET_ERROR_TYPE_DEBUG,
1566          "%s: Received HELLO_ACK from non-confirming peer\n",
1567          GNUNET_i2s (&socket->other_peer));
1568     return GNUNET_YES;
1569   }
1570   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1571   LOG (GNUNET_ERROR_TYPE_DEBUG,
1572        "%s: Received HELLO_ACK from %s\n",
1573        GNUNET_i2s (&socket->other_peer),
1574        GNUNET_i2s (&socket->other_peer));
1575
1576   GNUNET_assert (socket->tunnel == tunnel);
1577   switch (socket->state)
1578   {
1579   case STATE_HELLO_WAIT:
1580     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1581     LOG (GNUNET_ERROR_TYPE_DEBUG,
1582          "%s: Read sequence number %u\n",
1583          GNUNET_i2s (&socket->other_peer),
1584          (unsigned int) socket->read_sequence_number);
1585     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1586     reply = generate_hello_ack (socket, GNUNET_YES);
1587     queue_message (socket,
1588                    &reply->header, 
1589                    &set_state_established,
1590                    NULL);    
1591     return GNUNET_OK;
1592   case STATE_ESTABLISHED:
1593     // call statistics (# ACKs ignored++)
1594     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1595                    socket->control_retransmission_task_id);
1596     socket->control_retransmission_task_id =
1597       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1598     return GNUNET_OK;
1599   default:
1600     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1601                GNUNET_i2s (&socket->other_peer),
1602                GNUNET_i2s (&socket->other_peer),
1603                socket->state);
1604     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1605     return GNUNET_SYSERR;
1606   }
1607 }
1608
1609
1610 /**
1611  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1612  *
1613  * @param cls the socket (set from GNUNET_MESH_connect)
1614  * @param tunnel connection to the other end
1615  * @param tunnel_ctx this is NULL
1616  * @param sender who sent the message
1617  * @param message the actual message
1618  * @param atsi performance data for the connection
1619  * @return GNUNET_OK to keep the connection open,
1620  *         GNUNET_SYSERR to close it (signal serious error)
1621  */
1622 static int
1623 client_handle_reset (void *cls,
1624                      struct GNUNET_MESH_Tunnel *tunnel,
1625                      void **tunnel_ctx,
1626                      const struct GNUNET_PeerIdentity *sender,
1627                      const struct GNUNET_MessageHeader *message,
1628                      const struct GNUNET_ATS_Information*atsi)
1629 {
1630   // struct GNUNET_STREAM_Socket *socket = cls;
1631
1632   return GNUNET_OK;
1633 }
1634
1635
1636 /**
1637  * Common message handler for handling TRANSMIT_CLOSE messages
1638  *
1639  * @param socket the socket through which the ack was received
1640  * @param tunnel connection to the other end
1641  * @param sender who sent the message
1642  * @param msg the transmit close message
1643  * @param atsi performance data for the connection
1644  * @return GNUNET_OK to keep the connection open,
1645  *         GNUNET_SYSERR to close it (signal serious error)
1646  */
1647 static int
1648 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1649                        struct GNUNET_MESH_Tunnel *tunnel,
1650                        const struct GNUNET_PeerIdentity *sender,
1651                        const struct GNUNET_STREAM_MessageHeader *msg,
1652                        const struct GNUNET_ATS_Information*atsi)
1653 {
1654   struct GNUNET_STREAM_MessageHeader *reply;
1655
1656   switch (socket->state)
1657   {
1658   case STATE_ESTABLISHED:
1659     socket->state = STATE_RECEIVE_CLOSED;
1660
1661     /* Send TRANSMIT_CLOSE_ACK */
1662     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1663     reply->header.type = 
1664       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1665     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1666     queue_message (socket, reply, NULL, NULL);
1667     break;
1668
1669   default:
1670     /* FIXME: Call statistics? */
1671     break;
1672   }
1673   return GNUNET_YES;
1674 }
1675
1676
1677 /**
1678  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1679  *
1680  * @param cls the socket (set from GNUNET_MESH_connect)
1681  * @param tunnel connection to the other end
1682  * @param tunnel_ctx this is NULL
1683  * @param sender who sent the message
1684  * @param message the actual message
1685  * @param atsi performance data for the connection
1686  * @return GNUNET_OK to keep the connection open,
1687  *         GNUNET_SYSERR to close it (signal serious error)
1688  */
1689 static int
1690 client_handle_transmit_close (void *cls,
1691                               struct GNUNET_MESH_Tunnel *tunnel,
1692                               void **tunnel_ctx,
1693                               const struct GNUNET_PeerIdentity *sender,
1694                               const struct GNUNET_MessageHeader *message,
1695                               const struct GNUNET_ATS_Information*atsi)
1696 {
1697   struct GNUNET_STREAM_Socket *socket = cls;
1698   
1699   return handle_transmit_close (socket,
1700                                 tunnel,
1701                                 sender,
1702                                 (struct GNUNET_STREAM_MessageHeader *)message,
1703                                 atsi);
1704 }
1705
1706
1707 /**
1708  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1709  *
1710  * @param socket the socket
1711  * @param tunnel connection to the other end
1712  * @param sender who sent the message
1713  * @param message the actual message
1714  * @param atsi performance data for the connection
1715  * @param operation the close operation which is being ACK'ed
1716  * @return GNUNET_OK to keep the connection open,
1717  *         GNUNET_SYSERR to close it (signal serious error)
1718  */
1719 static int
1720 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1721                           struct GNUNET_MESH_Tunnel *tunnel,
1722                           const struct GNUNET_PeerIdentity *sender,
1723                           const struct GNUNET_STREAM_MessageHeader *message,
1724                           const struct GNUNET_ATS_Information *atsi,
1725                           int operation)
1726 {
1727   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1728
1729   shutdown_handle = socket->shutdown_handle;
1730   if (NULL == shutdown_handle)
1731   {
1732     LOG (GNUNET_ERROR_TYPE_DEBUG,
1733          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1734          GNUNET_i2s (&socket->other_peer));
1735     return GNUNET_OK;
1736   }
1737
1738   switch (operation)
1739   {
1740   case SHUT_RDWR:
1741     switch (socket->state)
1742     {
1743     case STATE_CLOSE_WAIT:
1744       if (SHUT_RDWR != shutdown_handle->operation)
1745       {
1746         LOG (GNUNET_ERROR_TYPE_DEBUG,
1747              "%s: Received CLOSE_ACK when shutdown handle is not for "
1748              "SHUT_RDWR\n",
1749              GNUNET_i2s (&socket->other_peer));
1750         return GNUNET_OK;
1751       }
1752
1753       LOG (GNUNET_ERROR_TYPE_DEBUG,
1754            "%s: Received CLOSE_ACK from %s\n",
1755            GNUNET_i2s (&socket->other_peer),
1756            GNUNET_i2s (&socket->other_peer));
1757       socket->state = STATE_CLOSED;
1758       break;
1759     default:
1760       LOG (GNUNET_ERROR_TYPE_DEBUG,
1761            "%s: Received CLOSE_ACK when in it not expected\n",
1762            GNUNET_i2s (&socket->other_peer));
1763       return GNUNET_OK;
1764     }
1765     break;
1766
1767   case SHUT_RD:
1768     switch (socket->state)
1769     {
1770     case STATE_RECEIVE_CLOSE_WAIT:
1771       if (SHUT_RD != shutdown_handle->operation)
1772       {
1773         LOG (GNUNET_ERROR_TYPE_DEBUG,
1774              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1775              "is not for SHUT_RD\n",
1776              GNUNET_i2s (&socket->other_peer));
1777         return GNUNET_OK;
1778       }
1779
1780       LOG (GNUNET_ERROR_TYPE_DEBUG,
1781            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1782            GNUNET_i2s (&socket->other_peer),
1783            GNUNET_i2s (&socket->other_peer));
1784       socket->state = STATE_RECEIVE_CLOSED;
1785       break;
1786     default:
1787       LOG (GNUNET_ERROR_TYPE_DEBUG,
1788            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1789            GNUNET_i2s (&socket->other_peer));
1790       return GNUNET_OK;
1791     }
1792
1793     break;
1794   case SHUT_WR:
1795     switch (socket->state)
1796     {
1797     case STATE_TRANSMIT_CLOSE_WAIT:
1798       if (SHUT_WR != shutdown_handle->operation)
1799       {
1800         LOG (GNUNET_ERROR_TYPE_DEBUG,
1801              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1802              "is not for SHUT_WR\n",
1803              GNUNET_i2s (&socket->other_peer));
1804         return GNUNET_OK;
1805       }
1806
1807       LOG (GNUNET_ERROR_TYPE_DEBUG,
1808            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1809            GNUNET_i2s (&socket->other_peer),
1810            GNUNET_i2s (&socket->other_peer));
1811       socket->state = STATE_TRANSMIT_CLOSED;
1812       break;
1813     default:
1814       LOG (GNUNET_ERROR_TYPE_DEBUG,
1815            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1816            GNUNET_i2s (&socket->other_peer));
1817           
1818       return GNUNET_OK;
1819     }
1820     break;
1821   default:
1822     GNUNET_assert (0);
1823   }
1824
1825   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1826     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1827                                    operation);
1828   if (GNUNET_SCHEDULER_NO_TASK
1829       != shutdown_handle->close_msg_retransmission_task_id)
1830   {
1831     GNUNET_SCHEDULER_cancel
1832       (shutdown_handle->close_msg_retransmission_task_id);
1833     shutdown_handle->close_msg_retransmission_task_id =
1834       GNUNET_SCHEDULER_NO_TASK;
1835   }
1836   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1837   socket->shutdown_handle = NULL;
1838   return GNUNET_OK;
1839 }
1840
1841
1842 /**
1843  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1844  *
1845  * @param cls the socket (set from GNUNET_MESH_connect)
1846  * @param tunnel connection to the other end
1847  * @param tunnel_ctx this is NULL
1848  * @param sender who sent the message
1849  * @param message the actual message
1850  * @param atsi performance data for the connection
1851  * @return GNUNET_OK to keep the connection open,
1852  *         GNUNET_SYSERR to close it (signal serious error)
1853  */
1854 static int
1855 client_handle_transmit_close_ack (void *cls,
1856                                   struct GNUNET_MESH_Tunnel *tunnel,
1857                                   void **tunnel_ctx,
1858                                   const struct GNUNET_PeerIdentity *sender,
1859                                   const struct GNUNET_MessageHeader *message,
1860                                   const struct GNUNET_ATS_Information*atsi)
1861 {
1862   struct GNUNET_STREAM_Socket *socket = cls;
1863
1864   return handle_generic_close_ack (socket,
1865                                    tunnel,
1866                                    sender,
1867                                    (const struct GNUNET_STREAM_MessageHeader *)
1868                                    message,
1869                                    atsi,
1870                                    SHUT_WR);
1871 }
1872
1873
1874 /**
1875  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1876  *
1877  * @param socket the socket
1878  * @param tunnel connection to the other end
1879  * @param sender who sent the message
1880  * @param message the actual message
1881  * @param atsi performance data for the connection
1882  * @return GNUNET_OK to keep the connection open,
1883  *         GNUNET_SYSERR to close it (signal serious error)
1884  */
1885 static int
1886 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1887                       struct GNUNET_MESH_Tunnel *tunnel,
1888                       const struct GNUNET_PeerIdentity *sender,
1889                       const struct GNUNET_STREAM_MessageHeader *message,
1890                       const struct GNUNET_ATS_Information *atsi)
1891 {
1892   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1893
1894   switch (socket->state)
1895   {
1896   case STATE_INIT:
1897   case STATE_LISTEN:
1898   case STATE_HELLO_WAIT:
1899     LOG (GNUNET_ERROR_TYPE_DEBUG,
1900          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1901          GNUNET_i2s (&socket->other_peer));
1902     return GNUNET_OK;
1903   default:
1904     break;
1905   }
1906   
1907   LOG (GNUNET_ERROR_TYPE_DEBUG,
1908        "%s: Received RECEIVE_CLOSE from %s\n",
1909        GNUNET_i2s (&socket->other_peer),
1910        GNUNET_i2s (&socket->other_peer));
1911   receive_close_ack =
1912     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1913   receive_close_ack->header.size =
1914     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1915   receive_close_ack->header.type =
1916     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1917   queue_message (socket,
1918                  receive_close_ack,
1919                  &set_state_closed,
1920                  NULL);
1921   
1922   /* FIXME: Handle the case where write handle is present; the write operation
1923      should be deemed as finised and the write continuation callback
1924      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1925   return GNUNET_OK;
1926 }
1927
1928
1929 /**
1930  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1931  *
1932  * @param cls the socket (set from GNUNET_MESH_connect)
1933  * @param tunnel connection to the other end
1934  * @param tunnel_ctx this is NULL
1935  * @param sender who sent the message
1936  * @param message the actual message
1937  * @param atsi performance data for the connection
1938  * @return GNUNET_OK to keep the connection open,
1939  *         GNUNET_SYSERR to close it (signal serious error)
1940  */
1941 static int
1942 client_handle_receive_close (void *cls,
1943                              struct GNUNET_MESH_Tunnel *tunnel,
1944                              void **tunnel_ctx,
1945                              const struct GNUNET_PeerIdentity *sender,
1946                              const struct GNUNET_MessageHeader *message,
1947                              const struct GNUNET_ATS_Information*atsi)
1948 {
1949   struct GNUNET_STREAM_Socket *socket = cls;
1950
1951   return
1952     handle_receive_close (socket,
1953                           tunnel,
1954                           sender,
1955                           (const struct GNUNET_STREAM_MessageHeader *) message,
1956                           atsi);
1957 }
1958
1959
1960 /**
1961  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1962  *
1963  * @param cls the socket (set from GNUNET_MESH_connect)
1964  * @param tunnel connection to the other end
1965  * @param tunnel_ctx this is NULL
1966  * @param sender who sent the message
1967  * @param message the actual message
1968  * @param atsi performance data for the connection
1969  * @return GNUNET_OK to keep the connection open,
1970  *         GNUNET_SYSERR to close it (signal serious error)
1971  */
1972 static int
1973 client_handle_receive_close_ack (void *cls,
1974                                  struct GNUNET_MESH_Tunnel *tunnel,
1975                                  void **tunnel_ctx,
1976                                  const struct GNUNET_PeerIdentity *sender,
1977                                  const struct GNUNET_MessageHeader *message,
1978                                  const struct GNUNET_ATS_Information*atsi)
1979 {
1980   struct GNUNET_STREAM_Socket *socket = cls;
1981
1982   return handle_generic_close_ack (socket,
1983                                    tunnel,
1984                                    sender,
1985                                    (const struct GNUNET_STREAM_MessageHeader *)
1986                                    message,
1987                                    atsi,
1988                                    SHUT_RD);
1989 }
1990
1991
1992 /**
1993  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1994  *
1995  * @param socket the socket
1996  * @param tunnel connection to the other end
1997  * @param sender who sent the message
1998  * @param message the actual message
1999  * @param atsi performance data for the connection
2000  * @return GNUNET_OK to keep the connection open,
2001  *         GNUNET_SYSERR to close it (signal serious error)
2002  */
2003 static int
2004 handle_close (struct GNUNET_STREAM_Socket *socket,
2005               struct GNUNET_MESH_Tunnel *tunnel,
2006               const struct GNUNET_PeerIdentity *sender,
2007               const struct GNUNET_STREAM_MessageHeader *message,
2008               const struct GNUNET_ATS_Information*atsi)
2009 {
2010   struct GNUNET_STREAM_MessageHeader *close_ack;
2011
2012   switch (socket->state)
2013   {
2014   case STATE_INIT:
2015   case STATE_LISTEN:
2016   case STATE_HELLO_WAIT:
2017     LOG (GNUNET_ERROR_TYPE_DEBUG,
2018          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2019          GNUNET_i2s (&socket->other_peer));
2020     return GNUNET_OK;
2021   default:
2022     break;
2023   }
2024
2025   LOG (GNUNET_ERROR_TYPE_DEBUG,
2026        "%s: Received CLOSE from %s\n",
2027        GNUNET_i2s (&socket->other_peer),
2028        GNUNET_i2s (&socket->other_peer));
2029   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2030   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2031   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2032   queue_message (socket,
2033                  close_ack,
2034                  &set_state_closed,
2035                  NULL);
2036   if (socket->state == STATE_CLOSED)
2037     return GNUNET_OK;
2038
2039   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2040   socket->receive_buffer = NULL;
2041   socket->receive_buffer_size = 0;
2042   return GNUNET_OK;
2043 }
2044
2045
2046 /**
2047  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2048  *
2049  * @param cls the socket (set from GNUNET_MESH_connect)
2050  * @param tunnel connection to the other end
2051  * @param tunnel_ctx this is NULL
2052  * @param sender who sent the message
2053  * @param message the actual message
2054  * @param atsi performance data for the connection
2055  * @return GNUNET_OK to keep the connection open,
2056  *         GNUNET_SYSERR to close it (signal serious error)
2057  */
2058 static int
2059 client_handle_close (void *cls,
2060                      struct GNUNET_MESH_Tunnel *tunnel,
2061                      void **tunnel_ctx,
2062                      const struct GNUNET_PeerIdentity *sender,
2063                      const struct GNUNET_MessageHeader *message,
2064                      const struct GNUNET_ATS_Information*atsi)
2065 {
2066   struct GNUNET_STREAM_Socket *socket = cls;
2067
2068   return handle_close (socket,
2069                        tunnel,
2070                        sender,
2071                        (const struct GNUNET_STREAM_MessageHeader *) message,
2072                        atsi);
2073 }
2074
2075
2076 /**
2077  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2078  *
2079  * @param cls the socket (set from GNUNET_MESH_connect)
2080  * @param tunnel connection to the other end
2081  * @param tunnel_ctx this is NULL
2082  * @param sender who sent the message
2083  * @param message the actual message
2084  * @param atsi performance data for the connection
2085  * @return GNUNET_OK to keep the connection open,
2086  *         GNUNET_SYSERR to close it (signal serious error)
2087  */
2088 static int
2089 client_handle_close_ack (void *cls,
2090                          struct GNUNET_MESH_Tunnel *tunnel,
2091                          void **tunnel_ctx,
2092                          const struct GNUNET_PeerIdentity *sender,
2093                          const struct GNUNET_MessageHeader *message,
2094                          const struct GNUNET_ATS_Information *atsi)
2095 {
2096   struct GNUNET_STREAM_Socket *socket = cls;
2097
2098   return handle_generic_close_ack (socket,
2099                                    tunnel,
2100                                    sender,
2101                                    (const struct GNUNET_STREAM_MessageHeader *) 
2102                                    message,
2103                                    atsi,
2104                                    SHUT_RDWR);
2105 }
2106
2107 /*****************************/
2108 /* Server's Message Handlers */
2109 /*****************************/
2110
2111 /**
2112  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2113  *
2114  * @param cls the closure
2115  * @param tunnel connection to the other end
2116  * @param tunnel_ctx the socket
2117  * @param sender who sent the message
2118  * @param message the actual message
2119  * @param atsi performance data for the connection
2120  * @return GNUNET_OK to keep the connection open,
2121  *         GNUNET_SYSERR to close it (signal serious error)
2122  */
2123 static int
2124 server_handle_data (void *cls,
2125                     struct GNUNET_MESH_Tunnel *tunnel,
2126                     void **tunnel_ctx,
2127                     const struct GNUNET_PeerIdentity *sender,
2128                     const struct GNUNET_MessageHeader *message,
2129                     const struct GNUNET_ATS_Information*atsi)
2130 {
2131   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2132
2133   return handle_data (socket,
2134                       tunnel,
2135                       sender,
2136                       (const struct GNUNET_STREAM_DataMessage *)message,
2137                       atsi);
2138 }
2139
2140
2141 /**
2142  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2143  *
2144  * @param cls the closure
2145  * @param tunnel connection to the other end
2146  * @param tunnel_ctx the socket
2147  * @param sender who sent the message
2148  * @param message the actual message
2149  * @param atsi performance data for the connection
2150  * @return GNUNET_OK to keep the connection open,
2151  *         GNUNET_SYSERR to close it (signal serious error)
2152  */
2153 static int
2154 server_handle_hello (void *cls,
2155                      struct GNUNET_MESH_Tunnel *tunnel,
2156                      void **tunnel_ctx,
2157                      const struct GNUNET_PeerIdentity *sender,
2158                      const struct GNUNET_MessageHeader *message,
2159                      const struct GNUNET_ATS_Information*atsi)
2160 {
2161   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2162   struct GNUNET_STREAM_HelloAckMessage *reply;
2163
2164   if (0 != memcmp (sender,
2165                    &socket->other_peer,
2166                    sizeof (struct GNUNET_PeerIdentity)))
2167   {
2168     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2169                GNUNET_i2s (&socket->other_peer));
2170     return GNUNET_YES;
2171   }
2172   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2173   GNUNET_assert (socket->tunnel == tunnel);
2174   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2175              GNUNET_i2s (&socket->other_peer));
2176   switch (socket->status)
2177   {
2178   case STATE_INIT:
2179     reply = generate_hello_ack (socket, GNUNET_YES);
2180     queue_message (socket, &reply->header, &set_state_hello_wait, NULL);
2181     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2182                    socket->control_retransmission_task_id);
2183     socket->control_retransmission_task_id =
2184       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2185                                     &control_retransmission_task, socket);
2186     break;
2187   case STATE_HELLO_WAIT:
2188     /* Perhaps our HELLO_ACK was lost */
2189     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2190                    socket->control_retransmission_task_id);
2191     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2192     socket->control_retransmission_task_id =
2193       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2194     break;
2195   default:
2196     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2197                GNUNET_i2s (&socket->other_peer), socket->state);
2198     /* FIXME: Send RESET? */
2199   }
2200   return GNUNET_OK;
2201 }
2202
2203
2204 /**
2205  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2206  *
2207  * @param cls the closure
2208  * @param tunnel connection to the other end
2209  * @param tunnel_ctx the socket
2210  * @param sender who sent the message
2211  * @param message the actual message
2212  * @param atsi performance data for the connection
2213  * @return GNUNET_OK to keep the connection open,
2214  *         GNUNET_SYSERR to close it (signal serious error)
2215  */
2216 static int
2217 server_handle_hello_ack (void *cls,
2218                          struct GNUNET_MESH_Tunnel *tunnel,
2219                          void **tunnel_ctx,
2220                          const struct GNUNET_PeerIdentity *sender,
2221                          const struct GNUNET_MessageHeader *message,
2222                          const struct GNUNET_ATS_Information*atsi)
2223 {
2224   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2225   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2226
2227   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2228                  ntohs (message->type));
2229   GNUNET_assert (socket->tunnel == tunnel);
2230   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2231   switch (socket->state)  
2232   {
2233   case STATE_HELLO_WAIT:
2234     LOG (GNUNET_ERROR_TYPE_DEBUG,
2235          "%s: Received HELLO_ACK from %s\n",
2236          GNUNET_i2s (&socket->other_peer),
2237          GNUNET_i2s (&socket->other_peer));
2238     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2239     LOG (GNUNET_ERROR_TYPE_DEBUG,
2240          "%s: Read sequence number %u\n",
2241          GNUNET_i2s (&socket->other_peer),
2242          (unsigned int) socket->read_sequence_number);
2243     socket->receiver_window_available = 
2244       ntohl (ack_message->receiver_window_size);
2245     set_state_established (NULL, socket);
2246     break;
2247   default:
2248     LOG (GNUNET_ERROR_TYPE_DEBUG,
2249          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2250   }
2251   return GNUNET_OK;
2252 }
2253
2254
2255 /**
2256  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2257  *
2258  * @param cls the closure
2259  * @param tunnel connection to the other end
2260  * @param tunnel_ctx the socket
2261  * @param sender who sent the message
2262  * @param message the actual message
2263  * @param atsi performance data for the connection
2264  * @return GNUNET_OK to keep the connection open,
2265  *         GNUNET_SYSERR to close it (signal serious error)
2266  */
2267 static int
2268 server_handle_reset (void *cls,
2269                      struct GNUNET_MESH_Tunnel *tunnel,
2270                      void **tunnel_ctx,
2271                      const struct GNUNET_PeerIdentity *sender,
2272                      const struct GNUNET_MessageHeader *message,
2273                      const struct GNUNET_ATS_Information*atsi)
2274 {
2275   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2276
2277   return GNUNET_OK;
2278 }
2279
2280
2281 /**
2282  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2283  *
2284  * @param cls the closure
2285  * @param tunnel connection to the other end
2286  * @param tunnel_ctx the socket
2287  * @param sender who sent the message
2288  * @param message the actual message
2289  * @param atsi performance data for the connection
2290  * @return GNUNET_OK to keep the connection open,
2291  *         GNUNET_SYSERR to close it (signal serious error)
2292  */
2293 static int
2294 server_handle_transmit_close (void *cls,
2295                               struct GNUNET_MESH_Tunnel *tunnel,
2296                               void **tunnel_ctx,
2297                               const struct GNUNET_PeerIdentity *sender,
2298                               const struct GNUNET_MessageHeader *message,
2299                               const struct GNUNET_ATS_Information*atsi)
2300 {
2301   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2302
2303   return handle_transmit_close (socket,
2304                                 tunnel,
2305                                 sender,
2306                                 (struct GNUNET_STREAM_MessageHeader *)message,
2307                                 atsi);
2308 }
2309
2310
2311 /**
2312  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2313  *
2314  * @param cls the closure
2315  * @param tunnel connection to the other end
2316  * @param tunnel_ctx the socket
2317  * @param sender who sent the message
2318  * @param message the actual message
2319  * @param atsi performance data for the connection
2320  * @return GNUNET_OK to keep the connection open,
2321  *         GNUNET_SYSERR to close it (signal serious error)
2322  */
2323 static int
2324 server_handle_transmit_close_ack (void *cls,
2325                                   struct GNUNET_MESH_Tunnel *tunnel,
2326                                   void **tunnel_ctx,
2327                                   const struct GNUNET_PeerIdentity *sender,
2328                                   const struct GNUNET_MessageHeader *message,
2329                                   const struct GNUNET_ATS_Information*atsi)
2330 {
2331   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2332
2333   return handle_generic_close_ack (socket,
2334                                    tunnel,
2335                                    sender,
2336                                    (const struct GNUNET_STREAM_MessageHeader *)
2337                                    message,
2338                                    atsi,
2339                                    SHUT_WR);
2340 }
2341
2342
2343 /**
2344  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2345  *
2346  * @param cls the closure
2347  * @param tunnel connection to the other end
2348  * @param tunnel_ctx the socket
2349  * @param sender who sent the message
2350  * @param message the actual message
2351  * @param atsi performance data for the connection
2352  * @return GNUNET_OK to keep the connection open,
2353  *         GNUNET_SYSERR to close it (signal serious error)
2354  */
2355 static int
2356 server_handle_receive_close (void *cls,
2357                              struct GNUNET_MESH_Tunnel *tunnel,
2358                              void **tunnel_ctx,
2359                              const struct GNUNET_PeerIdentity *sender,
2360                              const struct GNUNET_MessageHeader *message,
2361                              const struct GNUNET_ATS_Information*atsi)
2362 {
2363   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2364
2365   return
2366     handle_receive_close (socket,
2367                           tunnel,
2368                           sender,
2369                           (const struct GNUNET_STREAM_MessageHeader *) message,
2370                           atsi);
2371 }
2372
2373
2374 /**
2375  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2376  *
2377  * @param cls the closure
2378  * @param tunnel connection to the other end
2379  * @param tunnel_ctx the socket
2380  * @param sender who sent the message
2381  * @param message the actual message
2382  * @param atsi performance data for the connection
2383  * @return GNUNET_OK to keep the connection open,
2384  *         GNUNET_SYSERR to close it (signal serious error)
2385  */
2386 static int
2387 server_handle_receive_close_ack (void *cls,
2388                                  struct GNUNET_MESH_Tunnel *tunnel,
2389                                  void **tunnel_ctx,
2390                                  const struct GNUNET_PeerIdentity *sender,
2391                                  const struct GNUNET_MessageHeader *message,
2392                                  const struct GNUNET_ATS_Information*atsi)
2393 {
2394   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2395
2396   return handle_generic_close_ack (socket,
2397                                    tunnel,
2398                                    sender,
2399                                    (const struct GNUNET_STREAM_MessageHeader *)
2400                                    message,
2401                                    atsi,
2402                                    SHUT_RD);
2403 }
2404
2405
2406 /**
2407  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2408  *
2409  * @param cls the listen socket (from GNUNET_MESH_connect in
2410  *          GNUNET_STREAM_listen) 
2411  * @param tunnel connection to the other end
2412  * @param tunnel_ctx the socket
2413  * @param sender who sent the message
2414  * @param message the actual message
2415  * @param atsi performance data for the connection
2416  * @return GNUNET_OK to keep the connection open,
2417  *         GNUNET_SYSERR to close it (signal serious error)
2418  */
2419 static int
2420 server_handle_close (void *cls,
2421                      struct GNUNET_MESH_Tunnel *tunnel,
2422                      void **tunnel_ctx,
2423                      const struct GNUNET_PeerIdentity *sender,
2424                      const struct GNUNET_MessageHeader *message,
2425                      const struct GNUNET_ATS_Information*atsi)
2426 {
2427   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2428   
2429   return handle_close (socket,
2430                        tunnel,
2431                        sender,
2432                        (const struct GNUNET_STREAM_MessageHeader *) message,
2433                        atsi);
2434 }
2435
2436
2437 /**
2438  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2439  *
2440  * @param cls the closure
2441  * @param tunnel connection to the other end
2442  * @param tunnel_ctx the socket
2443  * @param sender who sent the message
2444  * @param message the actual message
2445  * @param atsi performance data for the connection
2446  * @return GNUNET_OK to keep the connection open,
2447  *         GNUNET_SYSERR to close it (signal serious error)
2448  */
2449 static int
2450 server_handle_close_ack (void *cls,
2451                          struct GNUNET_MESH_Tunnel *tunnel,
2452                          void **tunnel_ctx,
2453                          const struct GNUNET_PeerIdentity *sender,
2454                          const struct GNUNET_MessageHeader *message,
2455                          const struct GNUNET_ATS_Information*atsi)
2456 {
2457   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2458
2459   return handle_generic_close_ack (socket,
2460                                    tunnel,
2461                                    sender,
2462                                    (const struct GNUNET_STREAM_MessageHeader *) 
2463                                    message,
2464                                    atsi,
2465                                    SHUT_RDWR);
2466 }
2467
2468
2469 /**
2470  * Handler for DATA_ACK messages
2471  *
2472  * @param socket the socket through which the ack was received
2473  * @param tunnel connection to the other end
2474  * @param sender who sent the message
2475  * @param ack the acknowledgment message
2476  * @param atsi performance data for the connection
2477  * @return GNUNET_OK to keep the connection open,
2478  *         GNUNET_SYSERR to close it (signal serious error)
2479  */
2480 static int
2481 handle_ack (struct GNUNET_STREAM_Socket *socket,
2482             struct GNUNET_MESH_Tunnel *tunnel,
2483             const struct GNUNET_PeerIdentity *sender,
2484             const struct GNUNET_STREAM_AckMessage *ack,
2485             const struct GNUNET_ATS_Information*atsi)
2486 {
2487   unsigned int packet;
2488   int need_retransmission;
2489   uint32_t sequence_difference;
2490   
2491   if (0 != memcmp (sender,
2492                    &socket->other_peer,
2493                    sizeof (struct GNUNET_PeerIdentity)))
2494   {
2495     LOG (GNUNET_ERROR_TYPE_DEBUG,
2496          "%s: Received ACK from non-confirming peer\n",
2497          GNUNET_i2s (&socket->other_peer));
2498     return GNUNET_YES;
2499   }
2500   switch (socket->state)
2501   {
2502   case (STATE_ESTABLISHED):
2503   case (STATE_RECEIVE_CLOSED):
2504   case (STATE_RECEIVE_CLOSE_WAIT):
2505     if (NULL == socket->write_handle)
2506     {
2507       LOG (GNUNET_ERROR_TYPE_DEBUG,
2508            "%s: Received DATA_ACK when write_handle is NULL\n",
2509            GNUNET_i2s (&socket->other_peer));
2510       return GNUNET_OK;
2511     }
2512     if (!((socket->write_sequence_number 
2513            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2514     {
2515       LOG (GNUNET_ERROR_TYPE_DEBUG,
2516            "%s: Received DATA_ACK with unexpected base sequence number\n",
2517            GNUNET_i2s (&socket->other_peer));
2518       LOG (GNUNET_ERROR_TYPE_DEBUG,
2519            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2520            GNUNET_i2s (&socket->other_peer),
2521            socket->write_sequence_number,
2522            ntohl (ack->base_sequence_number));
2523       return GNUNET_OK;
2524     }
2525     /* FIXME: include the case when write_handle is cancelled - ignore the 
2526        acks */
2527     LOG (GNUNET_ERROR_TYPE_DEBUG,
2528          "%s: Received DATA_ACK from %s\n",
2529          GNUNET_i2s (&socket->other_peer),
2530          GNUNET_i2s (&socket->other_peer));
2531       
2532     /* Cancel the retransmission task */
2533     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2534     {
2535       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2536       socket->data_retransmission_task_id = 
2537         GNUNET_SCHEDULER_NO_TASK;
2538     }
2539     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2540     {
2541       if (NULL == socket->write_handle->messages[packet]) break;
2542       /* BS: Base sequence from ack; PS: sequence num of current packet */
2543       sequence_difference = ntohl (ack->base_sequence_number)
2544         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2545       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2546       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2547           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2548               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2549       {
2550         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2551                               GNUNET_YES);
2552         continue;
2553       }
2554       if (GNUNET_YES == 
2555           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2556                                 -sequence_difference))/*inversion as PS >= BS */
2557       {
2558         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2559                               GNUNET_YES);
2560       }
2561     }
2562     /* Update the receive window remaining
2563        FIXME : Should update with the value from a data ack with greater
2564        sequence number */
2565     socket->receiver_window_available = 
2566       ntohl (ack->receive_window_remaining);
2567     /* Check if we have received all acknowledgements */
2568     need_retransmission = GNUNET_NO;
2569     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2570     {
2571       if (NULL == socket->write_handle->messages[packet]) break;
2572       if (GNUNET_YES != ackbitmap_is_bit_set 
2573           (&socket->write_handle->ack_bitmap,packet))
2574       {
2575         need_retransmission = GNUNET_YES;
2576         break;
2577       }
2578     }
2579     if (GNUNET_YES == need_retransmission)
2580     {
2581       write_data (socket);
2582     }
2583     else      /* We have to call the write continuation callback now */
2584     {
2585       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2586       
2587       /* Free the packets */
2588       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2589       {
2590         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2591       }
2592       write_handle = socket->write_handle;
2593       socket->write_handle = NULL;
2594       if (NULL != write_handle->write_cont)
2595         write_handle->write_cont (write_handle->write_cont_cls,
2596                                   socket->status,
2597                                   write_handle->size);
2598       /* We are done with the write handle - Freeing it */
2599       GNUNET_free (write_handle);
2600       LOG (GNUNET_ERROR_TYPE_DEBUG,
2601            "%s: Write completion callback completed\n",
2602            GNUNET_i2s (&socket->other_peer));      
2603     }
2604     break;
2605   default:
2606     break;
2607   }
2608   return GNUNET_OK;
2609 }
2610
2611
2612 /**
2613  * Handler for DATA_ACK messages
2614  *
2615  * @param cls the 'struct GNUNET_STREAM_Socket'
2616  * @param tunnel connection to the other end
2617  * @param tunnel_ctx unused
2618  * @param sender who sent the message
2619  * @param message the actual message
2620  * @param atsi performance data for the connection
2621  * @return GNUNET_OK to keep the connection open,
2622  *         GNUNET_SYSERR to close it (signal serious error)
2623  */
2624 static int
2625 client_handle_ack (void *cls,
2626                    struct GNUNET_MESH_Tunnel *tunnel,
2627                    void **tunnel_ctx,
2628                    const struct GNUNET_PeerIdentity *sender,
2629                    const struct GNUNET_MessageHeader *message,
2630                    const struct GNUNET_ATS_Information*atsi)
2631 {
2632   struct GNUNET_STREAM_Socket *socket = cls;
2633   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2634  
2635   return handle_ack (socket, tunnel, sender, ack, atsi);
2636 }
2637
2638
2639 /**
2640  * Handler for DATA_ACK messages
2641  *
2642  * @param cls the server's listen socket
2643  * @param tunnel connection to the other end
2644  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2645  * @param sender who sent the message
2646  * @param message the actual message
2647  * @param atsi performance data for the connection
2648  * @return GNUNET_OK to keep the connection open,
2649  *         GNUNET_SYSERR to close it (signal serious error)
2650  */
2651 static int
2652 server_handle_ack (void *cls,
2653                    struct GNUNET_MESH_Tunnel *tunnel,
2654                    void **tunnel_ctx,
2655                    const struct GNUNET_PeerIdentity *sender,
2656                    const struct GNUNET_MessageHeader *message,
2657                    const struct GNUNET_ATS_Information*atsi)
2658 {
2659   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2660   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2661  
2662   return handle_ack (socket, tunnel, sender, ack, atsi);
2663 }
2664
2665
2666 /**
2667  * For client message handlers, the stream socket is in the
2668  * closure argument.
2669  */
2670 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2671   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2672   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2673    sizeof (struct GNUNET_STREAM_AckMessage) },
2674   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2675    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2676   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2677    sizeof (struct GNUNET_STREAM_MessageHeader)},
2678   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2679    sizeof (struct GNUNET_STREAM_MessageHeader)},
2680   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2681    sizeof (struct GNUNET_STREAM_MessageHeader)},
2682   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2683    sizeof (struct GNUNET_STREAM_MessageHeader)},
2684   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2685    sizeof (struct GNUNET_STREAM_MessageHeader)},
2686   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2687    sizeof (struct GNUNET_STREAM_MessageHeader)},
2688   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2689    sizeof (struct GNUNET_STREAM_MessageHeader)},
2690   {NULL, 0, 0}
2691 };
2692
2693
2694 /**
2695  * For server message handlers, the stream socket is in the
2696  * tunnel context, and the listen socket in the closure argument.
2697  */
2698 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2699   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2700   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2701    sizeof (struct GNUNET_STREAM_AckMessage) },
2702   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2703    sizeof (struct GNUNET_STREAM_MessageHeader)},
2704   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2705    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2706   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2707    sizeof (struct GNUNET_STREAM_MessageHeader)},
2708   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2709    sizeof (struct GNUNET_STREAM_MessageHeader)},
2710   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2711    sizeof (struct GNUNET_STREAM_MessageHeader)},
2712   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2713    sizeof (struct GNUNET_STREAM_MessageHeader)},
2714   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2715    sizeof (struct GNUNET_STREAM_MessageHeader)},
2716   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2717    sizeof (struct GNUNET_STREAM_MessageHeader)},
2718   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2719    sizeof (struct GNUNET_STREAM_MessageHeader)},
2720   {NULL, 0, 0}
2721 };
2722
2723
2724 /**
2725  * Function called when our target peer is connected to our tunnel
2726  *
2727  * @param cls the socket for which this tunnel is created
2728  * @param peer the peer identity of the target
2729  * @param atsi performance data for the connection
2730  */
2731 static void
2732 mesh_peer_connect_callback (void *cls,
2733                             const struct GNUNET_PeerIdentity *peer,
2734                             const struct GNUNET_ATS_Information * atsi)
2735 {
2736   struct GNUNET_STREAM_Socket *socket = cls;
2737   struct GNUNET_STREAM_MessageHeader *message;
2738   
2739   if (0 != memcmp (peer,
2740                    &socket->other_peer,
2741                    sizeof (struct GNUNET_PeerIdentity)))
2742   {
2743     LOG (GNUNET_ERROR_TYPE_DEBUG,
2744          "%s: A peer which is not our target has connected to our tunnel\n",
2745          GNUNET_i2s(peer));
2746     return;
2747   }
2748   LOG (GNUNET_ERROR_TYPE_DEBUG,
2749        "%s: Target peer %s connected\n",
2750        GNUNET_i2s (&socket->other_peer),
2751        GNUNET_i2s (&socket->other_peer));
2752   /* Set state to INIT */
2753   socket->state = STATE_INIT;
2754   /* Send HELLO message */
2755   message = generate_hello ();
2756   queue_message (socket,
2757                  message,
2758                  &set_state_hello_wait,
2759                  NULL);
2760   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2761                  socket->control_retransmission_task_id);
2762   socket->control_retransmission_task_id =
2763     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2764                                   &control_retransmission_task, socket);
2765 }
2766
2767
2768 /**
2769  * Function called when our target peer is disconnected from our tunnel
2770  *
2771  * @param cls the socket associated which this tunnel
2772  * @param peer the peer identity of the target
2773  */
2774 static void
2775 mesh_peer_disconnect_callback (void *cls,
2776                                const struct GNUNET_PeerIdentity *peer)
2777 {
2778   struct GNUNET_STREAM_Socket *socket=cls;
2779   
2780   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2781   LOG (GNUNET_ERROR_TYPE_DEBUG,
2782        "%s: Other peer %s disconnected \n",
2783        GNUNET_i2s (&socket->other_peer),
2784        GNUNET_i2s (&socket->other_peer));
2785 }
2786
2787
2788 /**
2789  * Method called whenever a peer creates a tunnel to us
2790  *
2791  * @param cls closure
2792  * @param tunnel new handle to the tunnel
2793  * @param initiator peer that started the tunnel
2794  * @param atsi performance information for the tunnel
2795  * @return initial tunnel context for the tunnel
2796  *         (can be NULL -- that's not an error)
2797  */
2798 static void *
2799 new_tunnel_notify (void *cls,
2800                    struct GNUNET_MESH_Tunnel *tunnel,
2801                    const struct GNUNET_PeerIdentity *initiator,
2802                    const struct GNUNET_ATS_Information *atsi)
2803 {
2804   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2805   struct GNUNET_STREAM_Socket *socket;
2806
2807   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2808      from the same peer again until the socket is closed */
2809
2810   if (GNUNET_NO == lsocket->listening)
2811   {
2812 //     FIXME: socket uninitalized
2813 //     FIXME: cannot use GNUNET_i2s twice in same call (static buffer)
2814 //     LOG (GNUNET_ERROR_TYPE_DEBUG,
2815 //          "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2816 //          GNUNET_i2s (&socket->other_peer),
2817 //          GNUNET_i2s (&socket->other_peer));
2818     GNUNET_MESH_tunnel_destroy (tunnel);
2819     return NULL;
2820   }
2821   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2822   socket->other_peer = *initiator;
2823   socket->tunnel = tunnel;
2824   socket->session_id = 0;       /* FIXME */
2825   socket->state = STATE_INIT;
2826   socket->lsocket = lsocket;
2827   socket->retransmit_timeout = lsocket->retransmit_timeout;
2828   socket->testing_active = lsocket->testing_active;
2829   socket->testing_set_write_sequence_number_value =
2830     lsocket->testing_set_write_sequence_number_value;    
2831   LOG (GNUNET_ERROR_TYPE_DEBUG,
2832        "%s: Peer %s initiated tunnel to us\n", 
2833        GNUNET_i2s (&socket->other_peer),
2834        GNUNET_i2s (&socket->other_peer));
2835   /* FIXME: Copy MESH handle from lsocket to socket */
2836   return socket;
2837 }
2838
2839
2840 /**
2841  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2842  * any associated state.  This function is NOT called if the client has
2843  * explicitly asked for the tunnel to be destroyed using
2844  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2845  * the tunnel.
2846  *
2847  * @param cls closure (set from GNUNET_MESH_connect)
2848  * @param tunnel connection to the other end (henceforth invalid)
2849  * @param tunnel_ctx place where local state associated
2850  *                   with the tunnel is stored
2851  */
2852 static void 
2853 tunnel_cleaner (void *cls,
2854                 const struct GNUNET_MESH_Tunnel *tunnel,
2855                 void *tunnel_ctx)
2856 {
2857   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2858
2859   if (tunnel != socket->tunnel)
2860     return;
2861
2862   GNUNET_break_op(0);
2863   LOG (GNUNET_ERROR_TYPE_DEBUG,
2864        "%s: Peer %s has terminated connection abruptly\n",
2865        GNUNET_i2s (&socket->other_peer),
2866        GNUNET_i2s (&socket->other_peer));
2867
2868   socket->status = GNUNET_STREAM_SHUTDOWN;
2869
2870   /* Clear Transmit handles */
2871   if (NULL != socket->transmit_handle)
2872   {
2873     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2874     socket->transmit_handle = NULL;
2875   }
2876   if (NULL != socket->ack_transmit_handle)
2877   {
2878     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2879     GNUNET_free (socket->ack_msg);
2880     socket->ack_msg = NULL;
2881     socket->ack_transmit_handle = NULL;
2882   }
2883   /* Stop Tasks using socket->tunnel */
2884   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2885   {
2886     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2887     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2888   }
2889   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2890   {
2891     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2892     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2893   }
2894   /* FIXME: Cancel all other tasks using socket->tunnel */
2895   socket->tunnel = NULL;
2896 }
2897
2898
2899 /**
2900  * Callback to signal timeout on lockmanager lock acquire
2901  *
2902  * @param cls the ListenSocket
2903  * @param tc the scheduler task context
2904  */
2905 static void
2906 lockmanager_acquire_timeout (void *cls, 
2907                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2908 {
2909   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2910   GNUNET_STREAM_ListenCallback listen_cb;
2911   void *listen_cb_cls;
2912
2913   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2914   listen_cb = lsocket->listen_cb;
2915   listen_cb_cls = lsocket->listen_cb_cls;
2916   GNUNET_STREAM_listen_close (lsocket);
2917   if (NULL != listen_cb)
2918     listen_cb (listen_cb_cls, NULL, NULL);  
2919 }
2920
2921
2922 /**
2923  * Callback to notify us on the status changes on app_port lock
2924  *
2925  * @param cls the ListenSocket
2926  * @param domain the domain name of the lock
2927  * @param lock the app_port
2928  * @param status the current status of the lock
2929  */
2930 static void
2931 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2932                        enum GNUNET_LOCKMANAGER_Status status)
2933 {
2934   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2935
2936   GNUNET_assert (lock == (uint32_t) lsocket->port);
2937   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2938   {
2939     lsocket->listening = GNUNET_YES;
2940     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2941     {
2942       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2943       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2944     }
2945     if (NULL == lsocket->mesh)
2946     {
2947       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2948
2949       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2950                                            lsocket, /* Closure */
2951                                            &new_tunnel_notify,
2952                                            &tunnel_cleaner,
2953                                            server_message_handlers,
2954                                            ports);
2955       GNUNET_assert (NULL != lsocket->mesh);
2956       if (NULL != lsocket->listen_ok_cb)
2957       {
2958         (void) lsocket->listen_ok_cb ();
2959       }
2960     }
2961   }
2962   if (GNUNET_LOCKMANAGER_RELEASE == status)
2963     lsocket->listening = GNUNET_NO;
2964 }
2965
2966
2967 /*****************/
2968 /* API functions */
2969 /*****************/
2970
2971
2972 /**
2973  * Tries to open a stream to the target peer
2974  *
2975  * @param cfg configuration to use
2976  * @param target the target peer to which the stream has to be opened
2977  * @param app_port the application port number which uniquely identifies this
2978  *            stream
2979  * @param open_cb this function will be called after stream has be established;
2980  *          cannot be NULL
2981  * @param open_cb_cls the closure for open_cb
2982  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2983  * @return if successful it returns the stream socket; NULL if stream cannot be
2984  *         opened 
2985  */
2986 struct GNUNET_STREAM_Socket *
2987 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2988                     const struct GNUNET_PeerIdentity *target,
2989                     GNUNET_MESH_ApplicationType app_port,
2990                     GNUNET_STREAM_OpenCallback open_cb,
2991                     void *open_cb_cls,
2992                     ...)
2993 {
2994   struct GNUNET_STREAM_Socket *socket;
2995   enum GNUNET_STREAM_Option option;
2996   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2997   va_list vargs;
2998
2999   LOG (GNUNET_ERROR_TYPE_DEBUG,
3000        "%s\n", __func__);
3001   GNUNET_assert (NULL != open_cb);
3002   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3003   socket->other_peer = *target;
3004   socket->open_cb = open_cb;
3005   socket->open_cls = open_cb_cls;
3006   /* Set defaults */
3007   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3008   socket->testing_active = GNUNET_NO;
3009   va_start (vargs, open_cb_cls); /* Parse variable args */
3010   do {
3011     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3012     switch (option)
3013     {
3014     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3015       /* Expect struct GNUNET_TIME_Relative */
3016       socket->retransmit_timeout = va_arg (vargs,
3017                                            struct GNUNET_TIME_Relative);
3018       break;
3019     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3020       socket->testing_active = GNUNET_YES;
3021       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3022                                                                 uint32_t);
3023       break;
3024     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3025       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3026       break;
3027     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3028       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3029       break;
3030     case GNUNET_STREAM_OPTION_END:
3031       break;
3032     }
3033   } while (GNUNET_STREAM_OPTION_END != option);
3034   va_end (vargs);               /* End of variable args parsing */
3035   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3036                                       socket, /* cls */
3037                                       NULL, /* No inbound tunnel handler */
3038                                       NULL, /* No in-tunnel cleaner */
3039                                       client_message_handlers,
3040                                       ports); /* We don't get inbound tunnels */
3041   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3042   {
3043     GNUNET_free (socket);
3044     return NULL;
3045   }
3046   /* Now create the mesh tunnel to target */
3047   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3048   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3049                                               NULL, /* Tunnel context */
3050                                               &mesh_peer_connect_callback,
3051                                               &mesh_peer_disconnect_callback,
3052                                               socket);
3053   GNUNET_assert (NULL != socket->tunnel);
3054   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3055                                         &socket->other_peer);
3056   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3057   return socket;
3058 }
3059
3060
3061 /**
3062  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3063  *
3064  * @param socket the stream socket
3065  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3066  * @param completion_cb the callback that will be called upon successful
3067  *          shutdown of given operation
3068  * @param completion_cls the closure for the completion callback
3069  * @return the shutdown handle
3070  */
3071 struct GNUNET_STREAM_ShutdownHandle *
3072 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3073                         int operation,
3074                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3075                         void *completion_cls)
3076 {
3077   struct GNUNET_STREAM_ShutdownHandle *handle;
3078   struct GNUNET_STREAM_MessageHeader *msg;
3079   
3080   GNUNET_assert (NULL == socket->shutdown_handle);
3081
3082   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3083   handle->socket = socket;
3084   handle->completion_cb = completion_cb;
3085   handle->completion_cls = completion_cls;
3086   socket->shutdown_handle = handle;
3087
3088   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3089   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3090   switch (operation)
3091   {
3092   case SHUT_RD:
3093     handle->operation = SHUT_RD;
3094     if (NULL != socket->read_handle)
3095       LOG (GNUNET_ERROR_TYPE_WARNING,
3096            "Existing read handle should be cancelled before shutting"
3097            " down reading\n");
3098     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3099     queue_message (socket,
3100                    msg,
3101                    &set_state_receive_close_wait,
3102                    NULL);
3103     break;
3104   case SHUT_WR:
3105     handle->operation = SHUT_WR;
3106     if (NULL != socket->write_handle)
3107       LOG (GNUNET_ERROR_TYPE_WARNING,
3108            "Existing write handle should be cancelled before shutting"
3109            " down writing\n");
3110     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3111     queue_message (socket,
3112                    msg,
3113                    &set_state_transmit_close_wait,
3114                    NULL);
3115     break;
3116   case SHUT_RDWR:
3117     handle->operation = SHUT_RDWR;
3118     if (NULL != socket->write_handle)
3119       LOG (GNUNET_ERROR_TYPE_WARNING,
3120            "Existing write handle should be cancelled before shutting"
3121            " down writing\n");
3122     if (NULL != socket->read_handle)
3123       LOG (GNUNET_ERROR_TYPE_WARNING,
3124            "Existing read handle should be cancelled before shutting"
3125            " down reading\n");
3126     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3127     queue_message (socket,
3128                    msg,
3129                    &set_state_close_wait,
3130                    NULL);
3131     break;
3132   default:
3133     LOG (GNUNET_ERROR_TYPE_WARNING,
3134          "GNUNET_STREAM_shutdown called with invalid value for "
3135          "parameter operation -- Ignoring\n");
3136     GNUNET_free (msg);
3137     GNUNET_free (handle);
3138     return NULL;
3139   }
3140   handle->close_msg_retransmission_task_id =
3141     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3142                                   &close_msg_retransmission_task,
3143                                   handle);
3144   return handle;
3145 }
3146
3147
3148 /**
3149  * Cancels a pending shutdown
3150  *
3151  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3152  */
3153 void
3154 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3155 {
3156   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3157     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3158   handle->socket->shutdown_handle = NULL;
3159   GNUNET_free (handle);
3160 }
3161
3162
3163 /**
3164  * Closes the stream
3165  *
3166  * @param socket the stream socket
3167  */
3168 void
3169 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3170 {
3171   struct MessageQueue *head;
3172
3173   if (NULL != socket->read_handle)
3174   {
3175     LOG (GNUNET_ERROR_TYPE_WARNING,
3176          "Closing STREAM socket when a read handle is pending\n");
3177   }
3178   if (NULL != socket->write_handle)
3179   {
3180     LOG (GNUNET_ERROR_TYPE_WARNING,
3181          "Closing STREAM socket when a write handle is pending\n");
3182     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3183     //socket->write_handle = NULL;
3184   }
3185   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3186   {
3187     /* socket closed with read task pending!? */
3188     GNUNET_break (0);
3189     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3190     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3191   }  
3192   /* Terminate the ack'ing tasks if they are still present */
3193   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3194   {
3195     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3196     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3197   }
3198   /* Terminate the control retransmission tasks */
3199   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3200   {
3201     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3202   }
3203   /* Clear Transmit handles */
3204   if (NULL != socket->transmit_handle)
3205   {
3206     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3207     socket->transmit_handle = NULL;
3208   }
3209   if (NULL != socket->ack_transmit_handle)
3210   {
3211     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3212     GNUNET_free (socket->ack_msg);
3213     socket->ack_msg = NULL;
3214     socket->ack_transmit_handle = NULL;
3215   }
3216   /* Clear existing message queue */
3217   while (NULL != (head = socket->queue_head)) {
3218     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3219                                  socket->queue_tail,
3220                                  head);
3221     GNUNET_free (head->message);
3222     GNUNET_free (head);
3223   }
3224
3225   /* Close associated tunnel */
3226   if (NULL != socket->tunnel)
3227   {
3228     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3229     socket->tunnel = NULL;
3230   }
3231
3232   /* Close mesh connection */
3233   if (NULL != socket->mesh && NULL == socket->lsocket)
3234   {
3235     GNUNET_MESH_disconnect (socket->mesh);
3236     socket->mesh = NULL;
3237   }
3238   
3239   /* Release receive buffer */
3240   if (NULL != socket->receive_buffer)
3241   {
3242     GNUNET_free (socket->receive_buffer);
3243   }
3244
3245   GNUNET_free (socket);
3246 }
3247
3248
3249 /**
3250  * Listens for stream connections for a specific application ports
3251  *
3252  * @param cfg the configuration to use
3253  * @param app_port the application port for which new streams will be accepted
3254  * @param listen_cb this function will be called when a peer tries to establish
3255  *            a stream with us
3256  * @param listen_cb_cls closure for listen_cb
3257  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3258  * @return listen socket, NULL for any error
3259  */
3260 struct GNUNET_STREAM_ListenSocket *
3261 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3262                       GNUNET_MESH_ApplicationType app_port,
3263                       GNUNET_STREAM_ListenCallback listen_cb,
3264                       void *listen_cb_cls,
3265                       ...)
3266 {
3267   /* FIXME: Add variable args for passing configration options? */
3268   struct GNUNET_STREAM_ListenSocket *lsocket;
3269   struct GNUNET_TIME_Relative listen_timeout;
3270   enum GNUNET_STREAM_Option option;
3271   va_list vargs;
3272
3273   GNUNET_assert (NULL != listen_cb);
3274   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3275   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3276   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3277   if (NULL == lsocket->lockmanager)
3278   {
3279     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3280     GNUNET_free (lsocket);
3281     return NULL;
3282   }
3283   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3284   /* Set defaults */
3285   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3286   lsocket->testing_active = GNUNET_NO;
3287   lsocket->listen_ok_cb = NULL;
3288   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3289   va_start (vargs, listen_cb_cls);
3290   do {
3291     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3292     switch (option)
3293     {
3294     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3295       lsocket->retransmit_timeout = va_arg (vargs,
3296                                             struct GNUNET_TIME_Relative);
3297       break;
3298     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3299       lsocket->testing_active = GNUNET_YES;
3300       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3301                                                                  uint32_t);
3302       break;
3303     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3304       listen_timeout = GNUNET_TIME_relative_multiply
3305         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3306       break;
3307     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3308       lsocket->listen_ok_cb = va_arg (vargs,
3309                                       GNUNET_STREAM_ListenSuccessCallback);
3310       break;
3311     case GNUNET_STREAM_OPTION_END:
3312       break;
3313     }
3314   } while (GNUNET_STREAM_OPTION_END != option);
3315   va_end (vargs);
3316   lsocket->port = app_port;
3317   lsocket->listen_cb = listen_cb;
3318   lsocket->listen_cb_cls = listen_cb_cls;
3319   lsocket->locking_request = 
3320     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3321                                      (uint32_t) lsocket->port,
3322                                      &lock_status_change_cb, lsocket);
3323   lsocket->lockmanager_acquire_timeout_task =
3324     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3325                                   &lockmanager_acquire_timeout, lsocket);
3326   return lsocket;
3327 }
3328
3329
3330 /**
3331  * Closes the listen socket
3332  *
3333  * @param lsocket the listen socket
3334  */
3335 void
3336 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3337 {
3338   /* Close MESH connection */
3339   if (NULL != lsocket->mesh)
3340     GNUNET_MESH_disconnect (lsocket->mesh);
3341   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3342   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3343     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3344   if (NULL != lsocket->locking_request)
3345     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3346   if (NULL != lsocket->lockmanager)
3347     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3348   GNUNET_free (lsocket);
3349 }
3350
3351
3352 /**
3353  * Tries to write the given data to the stream. The maximum size of data that
3354  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3355  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3356  * violation, however only the said number of maximum bytes will be written.
3357  *
3358  * @param socket the socket representing a stream
3359  * @param data the data buffer from where the data is written into the stream
3360  * @param size the number of bytes to be written from the data buffer
3361  * @param timeout the timeout period
3362  * @param write_cont the function to call upon writing some bytes into the
3363  *          stream 
3364  * @param write_cont_cls the closure
3365  *
3366  * @return handle to cancel the operation; if a previous write is pending or
3367  *           the stream has been shutdown for this operation then write_cont is
3368  *           immediately called and NULL is returned.
3369  */
3370 struct GNUNET_STREAM_IOWriteHandle *
3371 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3372                      const void *data,
3373                      size_t size,
3374                      struct GNUNET_TIME_Relative timeout,
3375                      GNUNET_STREAM_CompletionContinuation write_cont,
3376                      void *write_cont_cls)
3377 {
3378   unsigned int num_needed_packets;
3379   unsigned int packet;
3380   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3381   uint32_t packet_size;
3382   uint32_t payload_size;
3383   struct GNUNET_STREAM_DataMessage *data_msg;
3384   const void *sweep;
3385   struct GNUNET_TIME_Relative ack_deadline;
3386
3387   LOG (GNUNET_ERROR_TYPE_DEBUG,
3388        "%s\n", __func__);
3389
3390   /* Return NULL if there is already a write request pending */
3391   if (NULL != socket->write_handle)
3392   {
3393     GNUNET_break (0);
3394     return NULL;
3395   }
3396
3397   switch (socket->state)
3398   {
3399   case STATE_TRANSMIT_CLOSED:
3400   case STATE_TRANSMIT_CLOSE_WAIT:
3401   case STATE_CLOSED:
3402   case STATE_CLOSE_WAIT:
3403     if (NULL != write_cont)
3404       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3405     LOG (GNUNET_ERROR_TYPE_DEBUG,
3406          "%s() END\n", __func__);
3407     return NULL;
3408   case STATE_INIT:
3409   case STATE_LISTEN:
3410   case STATE_HELLO_WAIT:
3411     if (NULL != write_cont)
3412       /* FIXME: GNUNET_STREAM_SYSERR?? */
3413       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3414     LOG (GNUNET_ERROR_TYPE_DEBUG,
3415          "%s() END\n", __func__);
3416     return NULL;
3417   case STATE_ESTABLISHED:
3418   case STATE_RECEIVE_CLOSED:
3419   case STATE_RECEIVE_CLOSE_WAIT:
3420     break;
3421   }
3422
3423   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3424     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3425   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3426   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3427   io_handle->socket = socket;
3428   io_handle->write_cont = write_cont;
3429   io_handle->write_cont_cls = write_cont_cls;
3430   io_handle->size = size;
3431   sweep = data;
3432   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3433      determined from RTT */
3434   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3435   /* Divide the given buffer into packets for sending */
3436   for (packet=0; packet < num_needed_packets; packet++)
3437   {
3438     if ((packet + 1) * max_payload_size < size) 
3439     {
3440       payload_size = max_payload_size;
3441       packet_size = MAX_PACKET_SIZE;
3442     }
3443     else 
3444     {
3445       payload_size = size - packet * max_payload_size;
3446       packet_size =  payload_size + sizeof (struct
3447                                             GNUNET_STREAM_DataMessage); 
3448     }
3449     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3450     io_handle->messages[packet]->header.header.size = htons (packet_size);
3451     io_handle->messages[packet]->header.header.type =
3452       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3453     io_handle->messages[packet]->sequence_number =
3454       htonl (socket->write_sequence_number++);
3455     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3456
3457     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3458        determined from RTT */
3459     io_handle->messages[packet]->ack_deadline =
3460       GNUNET_TIME_relative_hton (ack_deadline);
3461     data_msg = io_handle->messages[packet];
3462     /* Copy data from given buffer to the packet */
3463     memcpy (&data_msg[1],
3464             sweep,
3465             payload_size);
3466     sweep += payload_size;
3467     socket->write_offset += payload_size;
3468   }
3469   socket->write_handle = io_handle;
3470   write_data (socket);
3471
3472   LOG (GNUNET_ERROR_TYPE_DEBUG,
3473        "%s() END\n", __func__);
3474
3475   return io_handle;
3476 }
3477
3478
3479 /**
3480  * Tries to read data from the stream.
3481  *
3482  * @param socket the socket representing a stream
3483  * @param timeout the timeout period
3484  * @param proc function to call with data (once only)
3485  * @param proc_cls the closure for proc
3486  *
3487  * @return handle to cancel the operation; if the stream has been shutdown for
3488  *           this type of opeartion then the DataProcessor is immediately
3489  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3490  */
3491 struct GNUNET_STREAM_IOReadHandle *
3492 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3493                     struct GNUNET_TIME_Relative timeout,
3494                     GNUNET_STREAM_DataProcessor proc,
3495                     void *proc_cls)
3496 {
3497   struct GNUNET_STREAM_IOReadHandle *read_handle;
3498   
3499   LOG (GNUNET_ERROR_TYPE_DEBUG,
3500        "%s: %s()\n", 
3501        GNUNET_i2s (&socket->other_peer),
3502        __func__);
3503   /* Return NULL if there is already a read handle; the user has to cancel that
3504      first before continuing or has to wait until it is completed */
3505   if (NULL != socket->read_handle) 
3506     return NULL;
3507   GNUNET_assert (NULL != proc);
3508   switch (socket->state)
3509   {
3510   case STATE_RECEIVE_CLOSED:
3511   case STATE_RECEIVE_CLOSE_WAIT:
3512   case STATE_CLOSED:
3513   case STATE_CLOSE_WAIT:
3514     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3515     LOG (GNUNET_ERROR_TYPE_DEBUG,
3516          "%s: %s() END\n",
3517          GNUNET_i2s (&socket->other_peer),
3518          __func__);
3519     return NULL;
3520   default:
3521     break;
3522   }
3523   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3524   read_handle->proc = proc;
3525   read_handle->proc_cls = proc_cls;
3526   read_handle->socket = socket;
3527   socket->read_handle = read_handle;
3528   /* Check if we have a packet at bitmap 0 */
3529   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3530                                           0))
3531   {
3532     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3533                                                      socket);   
3534   }
3535   /* Setup the read timeout task */
3536   socket->read_io_timeout_task_id =
3537     GNUNET_SCHEDULER_add_delayed (timeout,
3538                                   &read_io_timeout,
3539                                   socket);
3540   LOG (GNUNET_ERROR_TYPE_DEBUG,
3541        "%s: %s() END\n",
3542        GNUNET_i2s (&socket->other_peer),
3543        __func__);
3544   return read_handle;
3545 }
3546
3547
3548 /**
3549  * Cancel pending write operation.
3550  *
3551  * @param ioh handle to operation to cancel
3552  */
3553 void
3554 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3555 {
3556   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3557   unsigned int packet;
3558
3559   GNUNET_assert (NULL != socket->write_handle);
3560   GNUNET_assert (socket->write_handle == ioh);
3561
3562   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3563   {
3564     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3565     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3566   }
3567
3568   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3569   {
3570     if (NULL == ioh->messages[packet]) break;
3571     GNUNET_free (ioh->messages[packet]);
3572   }
3573       
3574   GNUNET_free (socket->write_handle);
3575   socket->write_handle = NULL;
3576 }
3577
3578
3579 /**
3580  * Cancel pending read operation.
3581  *
3582  * @param ioh handle to operation to cancel
3583  */
3584 void
3585 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3586 {
3587   struct GNUNET_STREAM_Socket *socket;
3588   
3589   socket = ioh->socket;
3590   GNUNET_assert (NULL != socket->read_handle);
3591   GNUNET_assert (ioh == socket->read_handle);
3592   /* Read io time task should be there; if it is already executed then this
3593   read handle is not valid */
3594   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
3595   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3596   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3597   /* reading task may be present; if so we have to stop it */
3598   if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
3599   {
3600     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3601     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3602   }
3603   GNUNET_free (ioh);
3604   socket->read_handle = NULL;
3605 }
3606
3607 /* end of stream_api.c */