-control retransmission handling
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define MAX_PACKET_SIZE 512//64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * The maximum payload a data message packet can carry
75  */
76 static const size_t max_payload_size = 
77   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
78
79 /**
80  * states in the Protocol
81  */
82 enum State
83   {
84     /**
85      * Client initialization state
86      */
87     STATE_INIT,
88
89     /**
90      * Listener initialization state 
91      */
92     STATE_LISTEN,
93
94     /**
95      * Pre-connection establishment state
96      */
97     STATE_HELLO_WAIT,
98
99     /**
100      * State where a connection has been established
101      */
102     STATE_ESTABLISHED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_RECEIVE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed for reading
111      */
112     STATE_RECEIVE_CLOSED,
113
114     /**
115      * State where the socket is closed on our side and waiting to be ACK'ed
116      */
117     STATE_TRANSMIT_CLOSE_WAIT,
118
119     /**
120      * State where the socket is closed for writing
121      */
122     STATE_TRANSMIT_CLOSED,
123
124     /**
125      * State where the socket is closed on our side and waiting to be ACK'ed
126      */
127     STATE_CLOSE_WAIT,
128
129     /**
130      * State where the socket is closed
131      */
132     STATE_CLOSED 
133   };
134
135
136 /**
137  * Functions of this type are called when a message is written
138  *
139  * @param cls the closure from queue_message
140  * @param socket the socket the written message was bound to
141  */
142 typedef void (*SendFinishCallback) (void *cls,
143                                     struct GNUNET_STREAM_Socket *socket);
144
145
146 /**
147  * The send message queue
148  */
149 struct MessageQueue
150 {
151   /**
152    * The message
153    */
154   struct GNUNET_STREAM_MessageHeader *message;
155
156   /**
157    * Callback to be called when the message is sent
158    */
159   SendFinishCallback finish_cb;
160
161   /**
162    * The closure for finish_cb
163    */
164   void *finish_cb_cls;
165
166   /**
167    * The next message in queue. Should be NULL in the last message
168    */
169   struct MessageQueue *next;
170
171   /**
172    * The next message in queue. Should be NULL in the first message
173    */
174   struct MessageQueue *prev;
175 };
176
177
178 /**
179  * The STREAM Socket Handler
180  */
181 struct GNUNET_STREAM_Socket
182 {
183   /**
184    * Retransmission timeout
185    */
186   struct GNUNET_TIME_Relative retransmit_timeout;
187
188   /**
189    * The Acknowledgement Bitmap
190    */
191   GNUNET_STREAM_AckBitmap ack_bitmap;
192
193   /**
194    * Time when the Acknowledgement was queued
195    */
196   struct GNUNET_TIME_Absolute ack_time_registered;
197
198   /**
199    * Queued Acknowledgement deadline
200    */
201   struct GNUNET_TIME_Relative ack_time_deadline;
202
203   /**
204    * The mesh handle
205    */
206   struct GNUNET_MESH_Handle *mesh;
207
208   /**
209    * The mesh tunnel handle
210    */
211   struct GNUNET_MESH_Tunnel *tunnel;
212
213   /**
214    * Stream open closure
215    */
216   void *open_cls;
217
218   /**
219    * Stream open callback
220    */
221   GNUNET_STREAM_OpenCallback open_cb;
222
223   /**
224    * The current transmit handle (if a pending transmit request exists)
225    */
226   struct GNUNET_MESH_TransmitHandle *transmit_handle;
227
228   /**
229    * The current act transmit handle (if a pending ack transmit request exists)
230    */
231   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
232
233   /**
234    * Pointer to the current ack message using in ack_task
235    */
236   struct GNUNET_STREAM_AckMessage *ack_msg;
237
238   /**
239    * The current message associated with the transmit handle
240    */
241   struct MessageQueue *queue_head;
242
243   /**
244    * The queue tail, should always point to the last message in queue
245    */
246   struct MessageQueue *queue_tail;
247
248   /**
249    * The write IO_handle associated with this socket
250    */
251   struct GNUNET_STREAM_IOWriteHandle *write_handle;
252
253   /**
254    * The read IO_handle associated with this socket
255    */
256   struct GNUNET_STREAM_IOReadHandle *read_handle;
257
258   /**
259    * The shutdown handle associated with this socket
260    */
261   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
262
263   /**
264    * Buffer for storing received messages
265    */
266   void *receive_buffer;
267
268   /**
269    * The listen socket from which this socket is derived. Should be NULL if it
270    * is not a derived socket
271    */
272   struct GNUNET_STREAM_ListenSocket *lsocket;
273
274   /**
275    * The peer identity of the peer at the other end of the stream
276    */
277   struct GNUNET_PeerIdentity other_peer;
278
279   /**
280    * Task identifier for the read io timeout task
281    */
282   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
283
284   /**
285    * Task identifier for retransmission task after timeout
286    */
287   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
288
289   /**
290    * Task identifier for retransmission of control messages
291    */
292   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
293
294   /**
295    * The task for sending timely Acks
296    */
297   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
298
299   /**
300    * Task scheduled to continue a read operation.
301    */
302   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
303
304   /**
305    * The state of the protocol associated with this socket
306    */
307   enum State state;
308
309   /**
310    * The status of the socket
311    */
312   enum GNUNET_STREAM_Status status;
313
314   /**
315    * The number of previous timeouts; FIXME: currently not used
316    */
317   unsigned int retries;
318
319   /**
320    * The application port number (type: uint32_t)
321    */
322   GNUNET_MESH_ApplicationType app_port;
323
324   /**
325    * Whether testing mode is active or not
326    */
327   int testing_active;
328
329   /**
330    * The write sequence number to be set incase of testing
331    */
332   uint32_t testing_set_write_sequence_number_value;
333
334   /**
335    * The session id associated with this stream connection
336    * FIXME: Not used currently, may be removed
337    */
338   uint32_t session_id;
339
340   /**
341    * Write sequence number. Set to random when sending HELLO(client) and
342    * HELLO_ACK(server) 
343    */
344   uint32_t write_sequence_number;
345
346   /**
347    * Read sequence number. This number's value is determined during handshake
348    */
349   uint32_t read_sequence_number;
350
351   /**
352    * The receiver buffer size
353    */
354   uint32_t receive_buffer_size;
355
356   /**
357    * The receiver buffer boundaries
358    */
359   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
360
361   /**
362    * receiver's available buffer after the last acknowledged packet
363    */
364   uint32_t receiver_window_available;
365
366   /**
367    * The offset pointer used during write operation
368    */
369   uint32_t write_offset;
370
371   /**
372    * The offset after which we are expecting data
373    */
374   uint32_t read_offset;
375
376   /**
377    * The offset upto which user has read from the received buffer
378    */
379   uint32_t copy_offset;
380 };
381
382
383 /**
384  * A socket for listening
385  */
386 struct GNUNET_STREAM_ListenSocket
387 {
388   /**
389    * The mesh handle
390    */
391   struct GNUNET_MESH_Handle *mesh;
392
393   /**
394    * Our configuration
395    */
396   struct GNUNET_CONFIGURATION_Handle *cfg;
397
398   /**
399    * Handle to the lock manager service
400    */
401   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
402
403   /**
404    * The active LockingRequest from lockmanager
405    */
406   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
407
408   /**
409    * Callback to call after acquring a lock and listening
410    */
411   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
412
413   /**
414    * The callback function which is called after successful opening socket
415    */
416   GNUNET_STREAM_ListenCallback listen_cb;
417
418   /**
419    * The call back closure
420    */
421   void *listen_cb_cls;
422
423   /**
424    * The service port
425    */
426   GNUNET_MESH_ApplicationType port;
427   
428   /**
429    * The id of the lockmanager timeout task
430    */
431   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
432
433   /**
434    * The retransmit timeout
435    */
436   struct GNUNET_TIME_Relative retransmit_timeout;
437   
438   /**
439    * Listen enabled?
440    */
441   int listening;
442
443   /**
444    * Whether testing mode is active or not
445    */
446   int testing_active;
447
448   /**
449    * The write sequence number to be set incase of testing
450    */
451   uint32_t testing_set_write_sequence_number_value;
452 };
453
454
455 /**
456  * The IO Write Handle
457  */
458 struct GNUNET_STREAM_IOWriteHandle
459 {
460   /**
461    * The socket to which this write handle is associated
462    */
463   struct GNUNET_STREAM_Socket *socket;
464
465   /**
466    * The packet_buffers associated with this Handle
467    */
468   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
469
470   /**
471    * The write continuation callback
472    */
473   GNUNET_STREAM_CompletionContinuation write_cont;
474
475   /**
476    * Write continuation closure
477    */
478   void *write_cont_cls;
479
480   /**
481    * The bitmap of this IOHandle; Corresponding bit for a message is set when
482    * it has been acknowledged by the receiver
483    */
484   GNUNET_STREAM_AckBitmap ack_bitmap;
485
486   /**
487    * Number of bytes in this write handle
488    */
489   size_t size;
490 };
491
492
493 /**
494  * The IO Read Handle
495  */
496 struct GNUNET_STREAM_IOReadHandle
497 {
498   /**
499    * The socket to which this read handle is associated
500    */
501   struct GNUNET_STREAM_Socket *socket;
502   
503   /**
504    * Callback for the read processor
505    */
506   GNUNET_STREAM_DataProcessor proc;
507
508   /**
509    * The closure pointer for the read processor callback
510    */
511   void *proc_cls;
512 };
513
514
515 /**
516  * Handle for Shutdown
517  */
518 struct GNUNET_STREAM_ShutdownHandle
519 {
520   /**
521    * The socket associated with this shutdown handle
522    */
523   struct GNUNET_STREAM_Socket *socket;
524
525   /**
526    * Shutdown completion callback
527    */
528   GNUNET_STREAM_ShutdownCompletion completion_cb;
529
530   /**
531    * Closure for completion callback
532    */
533   void *completion_cls;
534
535   /**
536    * Close message retransmission task id
537    */
538   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
539
540   /**
541    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
542    */
543   int operation;  
544 };
545
546
547 /**
548  * Default value in seconds for various timeouts
549  */
550 static const unsigned int default_timeout = 10;
551
552 /**
553  * The domain name for locks we use here
554  */
555 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
556
557
558 /**
559  * Callback function for sending queued message
560  *
561  * @param cls closure the socket
562  * @param size number of bytes available in buf
563  * @param buf where the callee should write the message
564  * @return number of bytes written to buf
565  */
566 static size_t
567 send_message_notify (void *cls, size_t size, void *buf)
568 {
569   struct GNUNET_STREAM_Socket *socket = cls;
570   struct MessageQueue *head;
571   size_t ret;
572
573   socket->transmit_handle = NULL; /* Remove the transmit handle */
574   head = socket->queue_head;
575   if (NULL == head)
576     return 0; /* just to be safe */
577   if (0 == size)                /* request timed out */
578   {
579     socket->retries++;
580     LOG (GNUNET_ERROR_TYPE_DEBUG,
581          "%s: Message sending timed out. Retry %d \n",
582          GNUNET_i2s (&socket->other_peer),
583          socket->retries);
584     socket->transmit_handle = 
585       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
586                                          0, /* Corking */
587                                          1, /* Priority */
588                                          /* FIXME: exponential backoff */
589                                          socket->retransmit_timeout,
590                                          &socket->other_peer,
591                                          ntohs (head->message->header.size),
592                                          &send_message_notify,
593                                          socket);
594     return 0;
595   }
596   ret = ntohs (head->message->header.size);
597   GNUNET_assert (size >= ret);
598   memcpy (buf, head->message, ret);
599   if (NULL != head->finish_cb)
600   {
601     head->finish_cb (head->finish_cb_cls, socket);
602   }
603   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
604                                socket->queue_tail,
605                                head);
606   GNUNET_free (head->message);
607   GNUNET_free (head);
608   head = socket->queue_head;
609   if (NULL != head)    /* more pending messages to send */
610   {
611     socket->retries = 0;
612     socket->transmit_handle = 
613       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
614                                          0, /* Corking */
615                                          1, /* Priority */
616                                          /* FIXME: exponential backoff */
617                                          socket->retransmit_timeout,
618                                          &socket->other_peer,
619                                          ntohs (head->message->header.size),
620                                          &send_message_notify,
621                                          socket);
622   }
623   return ret;
624 }
625
626
627 /**
628  * Queues a message for sending using the mesh connection of a socket
629  *
630  * @param socket the socket whose mesh connection is used
631  * @param message the message to be sent
632  * @param finish_cb the callback to be called when the message is sent
633  * @param finish_cb_cls the closure for the callback
634  */
635 static void
636 queue_message (struct GNUNET_STREAM_Socket *socket,
637                struct GNUNET_STREAM_MessageHeader *message,
638                SendFinishCallback finish_cb,
639                void *finish_cb_cls)
640 {
641   struct MessageQueue *queue_entity;
642
643   GNUNET_assert 
644     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
645      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
646
647   LOG (GNUNET_ERROR_TYPE_DEBUG,
648        "%s: Queueing message of type %d and size %d\n",
649        GNUNET_i2s (&socket->other_peer),
650        ntohs (message->header.type),
651        ntohs (message->header.size));
652   GNUNET_assert (NULL != message);
653   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
654   queue_entity->message = message;
655   queue_entity->finish_cb = finish_cb;
656   queue_entity->finish_cb_cls = finish_cb_cls;
657   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
658                                     socket->queue_tail,
659                                     queue_entity);
660   if (NULL == socket->transmit_handle)
661   {
662     socket->retries = 0;
663     socket->transmit_handle = 
664       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
665                                          0, /* Corking */
666                                          1, /* Priority */
667                                          socket->retransmit_timeout,
668                                          &socket->other_peer,
669                                          ntohs (message->header.size),
670                                          &send_message_notify,
671                                          socket);
672   }
673 }
674
675
676 /**
677  * Copies a message and queues it for sending using the mesh connection of
678  * given socket 
679  *
680  * @param socket the socket whose mesh connection is used
681  * @param message the message to be sent
682  * @param finish_cb the callback to be called when the message is sent
683  * @param finish_cb_cls the closure for the callback
684  */
685 static void
686 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
687                         const struct GNUNET_STREAM_MessageHeader *message,
688                         SendFinishCallback finish_cb,
689                         void *finish_cb_cls)
690 {
691   struct GNUNET_STREAM_MessageHeader *msg_copy;
692   uint16_t size;
693   
694   size = ntohs (message->header.size);
695   msg_copy = GNUNET_malloc (size);
696   memcpy (msg_copy, message, size);
697   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
698 }
699
700
701 /**
702  * Callback function for sending ack message
703  *
704  * @param cls closure the ACK message created in ack_task
705  * @param size number of bytes available in buffer
706  * @param buf where the callee should write the message
707  * @return number of bytes written to buf
708  */
709 static size_t
710 send_ack_notify (void *cls, size_t size, void *buf)
711 {
712   struct GNUNET_STREAM_Socket *socket = cls;
713
714   if (0 == size)
715   {
716     LOG (GNUNET_ERROR_TYPE_DEBUG,
717          "%s called with size 0\n", __func__);
718     return 0;
719   }
720   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
721   
722   size = ntohs (socket->ack_msg->header.header.size);
723   memcpy (buf, socket->ack_msg, size);
724   
725   GNUNET_free (socket->ack_msg);
726   socket->ack_msg = NULL;
727   socket->ack_transmit_handle = NULL;
728   return size;
729 }
730
731
732 /**
733  * Writes data using the given socket. The amount of data written is limited by
734  * the receiver_window_size
735  *
736  * @param socket the socket to use
737  */
738 static void 
739 write_data (struct GNUNET_STREAM_Socket *socket);
740
741
742 /**
743  * Task for retransmitting data messages if they aren't ACK before their ack
744  * deadline 
745  *
746  * @param cls the socket
747  * @param tc the Task context
748  */
749 static void
750 data_retransmission_task (void *cls,
751                           const struct GNUNET_SCHEDULER_TaskContext *tc)
752 {
753   struct GNUNET_STREAM_Socket *socket = cls;
754   
755   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
756     return;
757   LOG (GNUNET_ERROR_TYPE_DEBUG,
758        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
759   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
760   write_data (socket);
761 }
762
763
764 /**
765  * Task for sending ACK message
766  *
767  * @param cls the socket
768  * @param tc the Task context
769  */
770 static void
771 ack_task (void *cls,
772           const struct GNUNET_SCHEDULER_TaskContext *tc)
773 {
774   struct GNUNET_STREAM_Socket *socket = cls;
775   struct GNUNET_STREAM_AckMessage *ack_msg;
776
777   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
778   {
779     return;
780   }
781   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
782   /* Create the ACK Message */
783   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
784   ack_msg->header.header.size = htons (sizeof (struct 
785                                                GNUNET_STREAM_AckMessage));
786   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
787   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
788   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
789   ack_msg->receive_window_remaining = 
790     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
791   socket->ack_msg = ack_msg;
792   /* Request MESH for sending ACK */
793   socket->ack_transmit_handle = 
794     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
795                                        0, /* Corking */
796                                        1, /* Priority */
797                                        socket->retransmit_timeout,
798                                        &socket->other_peer,
799                                        ntohs (ack_msg->header.header.size),
800                                        &send_ack_notify,
801                                        socket);
802 }
803
804
805 /**
806  * Retransmission task for shutdown messages
807  *
808  * @param cls the shutdown handle
809  * @param tc the Task Context
810  */
811 static void
812 close_msg_retransmission_task (void *cls,
813                                const struct GNUNET_SCHEDULER_TaskContext *tc)
814 {
815   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
816   struct GNUNET_STREAM_MessageHeader *msg;
817   struct GNUNET_STREAM_Socket *socket;
818
819   GNUNET_assert (NULL != shutdown_handle);
820   socket = shutdown_handle->socket;
821
822   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
823   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
824   switch (shutdown_handle->operation)
825   {
826   case SHUT_RDWR:
827     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
828     break;
829   case SHUT_RD:
830     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
831     break;
832   case SHUT_WR:
833     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
834     break;
835   default:
836     GNUNET_free (msg);
837     shutdown_handle->close_msg_retransmission_task_id = 
838       GNUNET_SCHEDULER_NO_TASK;
839     return;
840   }
841   queue_message (socket, msg, NULL, NULL);
842   shutdown_handle->close_msg_retransmission_task_id =
843     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
844                                   &close_msg_retransmission_task,
845                                   shutdown_handle);
846 }
847
848
849 /**
850  * Function to modify a bit in GNUNET_STREAM_AckBitmap
851  *
852  * @param bitmap the bitmap to modify
853  * @param bit the bit number to modify
854  * @param value GNUNET_YES to on, GNUNET_NO to off
855  */
856 static void
857 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
858                       unsigned int bit, 
859                       int value)
860 {
861   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
862   if (GNUNET_YES == value)
863     *bitmap |= (1LL << bit);
864   else
865     *bitmap &= ~(1LL << bit);
866 }
867
868
869 /**
870  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
871  *
872  * @param bitmap address of the bitmap that has to be checked
873  * @param bit the bit number to check
874  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
875  */
876 static uint8_t
877 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
878                       unsigned int bit)
879 {
880   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
881   return 0 != (*bitmap & (1LL << bit));
882 }
883
884
885 /**
886  * Writes data using the given socket. The amount of data written is limited by
887  * the receiver_window_size
888  *
889  * @param socket the socket to use
890  */
891 static void 
892 write_data (struct GNUNET_STREAM_Socket *socket)
893 {
894   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
895   int packet;                   /* Although an int, should never be negative */
896   int ack_packet;
897
898   ack_packet = -1;
899   /* Find the last acknowledged packet */
900   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
901   {      
902     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
903                                             packet))
904       ack_packet = packet;        
905     else if (NULL == io_handle->messages[packet])
906       break;
907   }
908   /* Resend packets which weren't ack'ed */
909   for (packet=0; packet < ack_packet; packet++)
910   {
911     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
912                                            packet))
913     {
914       LOG (GNUNET_ERROR_TYPE_DEBUG,
915            "%s: Placing DATA message with sequence %u in send queue\n",
916            GNUNET_i2s (&socket->other_peer),
917            ntohl (io_handle->messages[packet]->sequence_number));
918       copy_and_queue_message (socket,
919                               &io_handle->messages[packet]->header,
920                               NULL,
921                               NULL);
922     }
923   }
924   packet = ack_packet + 1;
925   /* Now send new packets if there is enough buffer space */
926   while ( (NULL != io_handle->messages[packet]) &&
927           (socket->receiver_window_available 
928            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
929           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
930   {
931     socket->receiver_window_available -= 
932       ntohs (io_handle->messages[packet]->header.header.size);
933     LOG (GNUNET_ERROR_TYPE_DEBUG,
934          "%s: Placing DATA message with sequence %u in send queue\n",
935          GNUNET_i2s (&socket->other_peer),
936          ntohl (io_handle->messages[packet]->sequence_number));
937     copy_and_queue_message (socket,
938                             &io_handle->messages[packet]->header,
939                             NULL,
940                             NULL);
941     packet++;
942   }
943   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
944     socket->data_retransmission_task_id = 
945       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
946                                     (GNUNET_TIME_UNIT_SECONDS, 8),
947                                     &data_retransmission_task,
948                                     socket);
949 }
950
951
952 /**
953  * Task for calling the read processor
954  *
955  * @param cls the socket
956  * @param tc the task context
957  */
958 static void
959 call_read_processor (void *cls,
960                      const struct GNUNET_SCHEDULER_TaskContext *tc)
961 {
962   struct GNUNET_STREAM_Socket *socket = cls;
963   size_t read_size;
964   size_t valid_read_size;
965   unsigned int packet;
966   uint32_t sequence_increase;
967   uint32_t offset_increase;
968
969   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
970   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
971     return;
972
973   if (NULL == socket->receive_buffer) 
974     return;
975
976   GNUNET_assert (NULL != socket->read_handle);
977   GNUNET_assert (NULL != socket->read_handle->proc);
978
979   /* Check the bitmap for any holes */
980   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
981   {
982     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
983                                            packet))
984       break;
985   }
986   /* We only call read processor if we have the first packet */
987   GNUNET_assert (0 < packet);
988   valid_read_size = 
989     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
990   GNUNET_assert (0 != valid_read_size);
991   /* Cancel the read_io_timeout_task */
992   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
993   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
994   /* Call the data processor */
995   LOG (GNUNET_ERROR_TYPE_DEBUG,
996        "%s: Calling read processor\n",
997        GNUNET_i2s (&socket->other_peer));
998   read_size = 
999     socket->read_handle->proc (socket->read_handle->proc_cls,
1000                                socket->status,
1001                                socket->receive_buffer + socket->copy_offset,
1002                                valid_read_size);
1003   LOG (GNUNET_ERROR_TYPE_DEBUG,
1004        "%s: Read processor read %d bytes\n",
1005        GNUNET_i2s (&socket->other_peer), read_size);
1006   LOG (GNUNET_ERROR_TYPE_DEBUG,
1007        "%s: Read processor completed successfully\n",
1008        GNUNET_i2s (&socket->other_peer));
1009   /* Free the read handle */
1010   GNUNET_free (socket->read_handle);
1011   socket->read_handle = NULL;
1012   GNUNET_assert (read_size <= valid_read_size);
1013   socket->copy_offset += read_size;
1014   /* Determine upto which packet we can remove from the buffer */
1015   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1016   {
1017     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1018     { packet++; break; }
1019     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1020       break;
1021   }
1022
1023   /* If no packets can be removed we can't move the buffer */
1024   if (0 == packet) return;
1025   sequence_increase = packet;
1026   LOG (GNUNET_ERROR_TYPE_DEBUG,
1027        "%s: Sequence increase after read processor completion: %u\n",
1028        GNUNET_i2s (&socket->other_peer), sequence_increase);
1029
1030   /* Shift the data in the receive buffer */
1031   socket->receive_buffer = 
1032     memmove (socket->receive_buffer,
1033              socket->receive_buffer 
1034              + socket->receive_buffer_boundaries[sequence_increase-1],
1035              socket->receive_buffer_size
1036              - socket->receive_buffer_boundaries[sequence_increase-1]);
1037   /* Shift the bitmap */
1038   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1039   /* Set read_sequence_number */
1040   socket->read_sequence_number += sequence_increase;
1041   /* Set read_offset */
1042   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1043   socket->read_offset += offset_increase;
1044   /* Fix copy_offset */
1045   GNUNET_assert (offset_increase <= socket->copy_offset);
1046   socket->copy_offset -= offset_increase;
1047   /* Fix relative boundaries */
1048   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1049   {
1050     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1051     {
1052       uint32_t ahead_buffer_boundary;
1053
1054       ahead_buffer_boundary = 
1055         socket->receive_buffer_boundaries[packet + sequence_increase];
1056       if (0 == ahead_buffer_boundary)
1057         socket->receive_buffer_boundaries[packet] = 0;
1058       else
1059       {
1060         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1061         socket->receive_buffer_boundaries[packet] = 
1062           ahead_buffer_boundary - offset_increase;
1063       }
1064     }
1065     else
1066       socket->receive_buffer_boundaries[packet] = 0;
1067   }
1068 }
1069
1070
1071 /**
1072  * Cancels the existing read io handle
1073  *
1074  * @param cls the closure from the SCHEDULER call
1075  * @param tc the task context
1076  */
1077 static void
1078 read_io_timeout (void *cls,
1079                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1080 {
1081   struct GNUNET_STREAM_Socket *socket = cls;
1082   GNUNET_STREAM_DataProcessor proc;
1083   void *proc_cls;
1084
1085   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1086   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1087   {
1088     LOG (GNUNET_ERROR_TYPE_DEBUG,
1089          "%s: Read task timedout - Cancelling it\n",
1090          GNUNET_i2s (&socket->other_peer));
1091     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1092     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1093   }
1094   GNUNET_assert (NULL != socket->read_handle);
1095   proc = socket->read_handle->proc;
1096   proc_cls = socket->read_handle->proc_cls;
1097   GNUNET_free (socket->read_handle);
1098   socket->read_handle = NULL;
1099   /* Call the read processor to signal timeout */
1100   proc (proc_cls,
1101         GNUNET_STREAM_TIMEOUT,
1102         NULL,
1103         0);
1104 }
1105
1106
1107 /**
1108  * Handler for DATA messages; Same for both client and server
1109  *
1110  * @param socket the socket through which the ack was received
1111  * @param tunnel connection to the other end
1112  * @param sender who sent the message
1113  * @param msg the data message
1114  * @param atsi performance data for the connection
1115  * @return GNUNET_OK to keep the connection open,
1116  *         GNUNET_SYSERR to close it (signal serious error)
1117  */
1118 static int
1119 handle_data (struct GNUNET_STREAM_Socket *socket,
1120              struct GNUNET_MESH_Tunnel *tunnel,
1121              const struct GNUNET_PeerIdentity *sender,
1122              const struct GNUNET_STREAM_DataMessage *msg,
1123              const struct GNUNET_ATS_Information*atsi)
1124 {
1125   const void *payload;
1126   uint32_t bytes_needed;
1127   uint32_t relative_offset;
1128   uint32_t relative_sequence_number;
1129   uint16_t size;
1130
1131   size = htons (msg->header.header.size);
1132   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1133   {
1134     GNUNET_break_op (0);
1135     return GNUNET_SYSERR;
1136   }
1137
1138   if (0 != memcmp (sender,
1139                    &socket->other_peer,
1140                    sizeof (struct GNUNET_PeerIdentity)))
1141   {
1142     LOG (GNUNET_ERROR_TYPE_DEBUG,
1143          "%s: Received DATA from non-confirming peer\n",
1144          GNUNET_i2s (&socket->other_peer));
1145     return GNUNET_YES;
1146   }
1147
1148   switch (socket->state)
1149   {
1150   case STATE_ESTABLISHED:
1151   case STATE_TRANSMIT_CLOSED:
1152   case STATE_TRANSMIT_CLOSE_WAIT:
1153       
1154     /* check if the message's sequence number is in the range we are
1155        expecting */
1156     relative_sequence_number = 
1157       ntohl (msg->sequence_number) - socket->read_sequence_number;
1158     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1159     {
1160       LOG (GNUNET_ERROR_TYPE_DEBUG,
1161            "%s: Ignoring received message with sequence number %u\n",
1162            GNUNET_i2s (&socket->other_peer),
1163            ntohl (msg->sequence_number));
1164       /* Start ACK sending task if one is not already present */
1165       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1166       {
1167         socket->ack_task_id = 
1168           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1169                                         (msg->ack_deadline),
1170                                         &ack_task,
1171                                         socket);
1172       }
1173       return GNUNET_YES;
1174     }
1175       
1176     /* Check if we have already seen this message */
1177     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1178                                             relative_sequence_number))
1179     {
1180       LOG (GNUNET_ERROR_TYPE_DEBUG,
1181            "%s: Ignoring already received message with sequence number %u\n",
1182            GNUNET_i2s (&socket->other_peer),
1183            ntohl (msg->sequence_number));
1184       /* Start ACK sending task if one is not already present */
1185       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1186       {
1187         socket->ack_task_id = 
1188           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1189                                         (msg->ack_deadline),
1190                                         &ack_task,
1191                                         socket);
1192       }
1193       return GNUNET_YES;
1194     }
1195
1196     LOG (GNUNET_ERROR_TYPE_DEBUG,
1197          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1198          GNUNET_i2s (&socket->other_peer),
1199          ntohl (msg->sequence_number),
1200          ntohs (msg->header.header.size),
1201          GNUNET_i2s (&socket->other_peer));
1202       
1203     /* Check if we have to allocate the buffer */
1204     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1205     relative_offset = ntohl (msg->offset) - socket->read_offset;
1206     bytes_needed = relative_offset + size;
1207     if (bytes_needed > socket->receive_buffer_size)
1208     {
1209       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1210       {
1211         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1212                                                  bytes_needed);
1213         socket->receive_buffer_size = bytes_needed;
1214       }
1215       else
1216       {
1217         LOG (GNUNET_ERROR_TYPE_DEBUG,
1218              "%s: Cannot accommodate packet %d as buffer is full\n",
1219              GNUNET_i2s (&socket->other_peer),
1220              ntohl (msg->sequence_number));
1221         return GNUNET_YES;
1222       }
1223     }
1224       
1225     /* Copy Data to buffer */
1226     payload = &msg[1];
1227     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1228     memcpy (socket->receive_buffer + relative_offset,
1229             payload,
1230             size);
1231     socket->receive_buffer_boundaries[relative_sequence_number] = 
1232       relative_offset + size;
1233       
1234     /* Modify the ACK bitmap */
1235     ackbitmap_modify_bit (&socket->ack_bitmap,
1236                           relative_sequence_number,
1237                           GNUNET_YES);
1238
1239     /* Start ACK sending task if one is not already present */
1240     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1241     {
1242       socket->ack_task_id = 
1243         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1244                                       (msg->ack_deadline),
1245                                       &ack_task,
1246                                       socket);
1247     }
1248
1249     if ((NULL != socket->read_handle) /* A read handle is waiting */
1250         /* There is no current read task */
1251         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1252         /* We have the first packet */
1253         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1254                                                0)))
1255     {
1256       LOG (GNUNET_ERROR_TYPE_DEBUG,
1257            "%s: Scheduling read processor\n",
1258            GNUNET_i2s (&socket->other_peer));
1259           
1260       socket->read_task_id = 
1261         GNUNET_SCHEDULER_add_now (&call_read_processor,
1262                                   socket);
1263     }
1264       
1265     break;
1266
1267   default:
1268     LOG (GNUNET_ERROR_TYPE_DEBUG,
1269          "%s: Received data message when it cannot be handled\n",
1270          GNUNET_i2s (&socket->other_peer));
1271     break;
1272   }
1273   return GNUNET_YES;
1274 }
1275
1276
1277 /**
1278  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1279  *
1280  * @param cls the socket (set from GNUNET_MESH_connect)
1281  * @param tunnel connection to the other end
1282  * @param tunnel_ctx place to store local state associated with the tunnel
1283  * @param sender who sent the message
1284  * @param message the actual message
1285  * @param atsi performance data for the connection
1286  * @return GNUNET_OK to keep the connection open,
1287  *         GNUNET_SYSERR to close it (signal serious error)
1288  */
1289 static int
1290 client_handle_data (void *cls,
1291                     struct GNUNET_MESH_Tunnel *tunnel,
1292                     void **tunnel_ctx,
1293                     const struct GNUNET_PeerIdentity *sender,
1294                     const struct GNUNET_MessageHeader *message,
1295                     const struct GNUNET_ATS_Information*atsi)
1296 {
1297   struct GNUNET_STREAM_Socket *socket = cls;
1298
1299   return handle_data (socket, 
1300                       tunnel, 
1301                       sender, 
1302                       (const struct GNUNET_STREAM_DataMessage *) message, 
1303                       atsi);
1304 }
1305
1306
1307 /**
1308  * Callback to set state to ESTABLISHED
1309  *
1310  * @param cls the closure NULL;
1311  * @param socket the socket to requiring state change
1312  */
1313 static void
1314 set_state_established (void *cls,
1315                        struct GNUNET_STREAM_Socket *socket)
1316 {
1317   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1318        "%s: Attaining ESTABLISHED state\n",
1319        GNUNET_i2s (&socket->other_peer));
1320   socket->write_offset = 0;
1321   socket->read_offset = 0;
1322   socket->state = STATE_ESTABLISHED;
1323   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1324                  socket->control_retransmission_task_id);
1325   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1326   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1327   if (NULL != socket->lsocket)
1328   {
1329     LOG (GNUNET_ERROR_TYPE_DEBUG,
1330          "%s: Calling listen callback\n",
1331          GNUNET_i2s (&socket->other_peer));
1332     if (GNUNET_SYSERR == 
1333         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1334                                     socket,
1335                                     &socket->other_peer))
1336     {
1337       socket->state = STATE_CLOSED;
1338       /* FIXME: We should close in a decent way (send RST) */
1339       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1340       GNUNET_free (socket);
1341     }
1342   }
1343   else
1344     socket->open_cb (socket->open_cls, socket);
1345 }
1346
1347
1348 /**
1349  * Callback to set state to HELLO_WAIT
1350  *
1351  * @param cls the closure from queue_message
1352  * @param socket the socket to requiring state change
1353  */
1354 static void
1355 set_state_hello_wait (void *cls,
1356                       struct GNUNET_STREAM_Socket *socket)
1357 {
1358   GNUNET_assert (STATE_INIT == socket->state);
1359   LOG (GNUNET_ERROR_TYPE_DEBUG,
1360        "%s: Attaining HELLO_WAIT state\n",
1361        GNUNET_i2s (&socket->other_peer));
1362   socket->state = STATE_HELLO_WAIT;
1363 }
1364
1365
1366 /**
1367  * Callback to set state to CLOSE_WAIT
1368  *
1369  * @param cls the closure from queue_message
1370  * @param socket the socket requiring state change
1371  */
1372 static void
1373 set_state_close_wait (void *cls,
1374                       struct GNUNET_STREAM_Socket *socket)
1375 {
1376   LOG (GNUNET_ERROR_TYPE_DEBUG,
1377        "%s: Attaing CLOSE_WAIT state\n",
1378        GNUNET_i2s (&socket->other_peer));
1379   socket->state = STATE_CLOSE_WAIT;
1380   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1381   socket->receive_buffer = NULL;
1382   socket->receive_buffer_size = 0;
1383 }
1384
1385
1386 /**
1387  * Callback to set state to RECEIVE_CLOSE_WAIT
1388  *
1389  * @param cls the closure from queue_message
1390  * @param socket the socket requiring state change
1391  */
1392 static void
1393 set_state_receive_close_wait (void *cls,
1394                               struct GNUNET_STREAM_Socket *socket)
1395 {
1396   LOG (GNUNET_ERROR_TYPE_DEBUG,
1397        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1398        GNUNET_i2s (&socket->other_peer));
1399   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1400   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1401   socket->receive_buffer = NULL;
1402   socket->receive_buffer_size = 0;
1403 }
1404
1405
1406 /**
1407  * Callback to set state to TRANSMIT_CLOSE_WAIT
1408  *
1409  * @param cls the closure from queue_message
1410  * @param socket the socket requiring state change
1411  */
1412 static void
1413 set_state_transmit_close_wait (void *cls,
1414                                struct GNUNET_STREAM_Socket *socket)
1415 {
1416   LOG (GNUNET_ERROR_TYPE_DEBUG,
1417        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1418        GNUNET_i2s (&socket->other_peer));
1419   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1420 }
1421
1422
1423 /**
1424  * Callback to set state to CLOSED
1425  *
1426  * @param cls the closure from queue_message
1427  * @param socket the socket requiring state change
1428  */
1429 static void
1430 set_state_closed (void *cls,
1431                   struct GNUNET_STREAM_Socket *socket)
1432 {
1433   socket->state = STATE_CLOSED;
1434 }
1435
1436
1437 /**
1438  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1439  *
1440  * @return the generate hello message
1441  */
1442 static struct GNUNET_STREAM_MessageHeader *
1443 generate_hello (void)
1444 {
1445   struct GNUNET_STREAM_MessageHeader *msg;
1446
1447   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1448   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1449   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1450   return msg;
1451 }
1452
1453
1454 /**
1455  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1456  * socket
1457  *
1458  * @param socket the socket for which this HelloAckMessage has to be generated
1459  * @param generate_seq GNUNET_YES to generate the write sequence number,
1460  *          GNUNET_NO to use the existing sequence number
1461  * @return the HelloAckMessage
1462  */
1463 static struct GNUNET_STREAM_HelloAckMessage *
1464 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1465                     int generate_seq)
1466 {
1467   struct GNUNET_STREAM_HelloAckMessage *msg;
1468
1469   if (GNUNET_YES == generate_seq)
1470   {
1471     if (GNUNET_YES == socket->testing_active)
1472       socket->write_sequence_number =
1473         socket->testing_set_write_sequence_number_value;
1474     else
1475       socket->write_sequence_number = 
1476         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1477     LOG_DEBUG ("%s: write sequence number %u\n",
1478                GNUNET_i2s (&socket->other_peer),
1479                (unsigned int) socket->write_sequence_number);
1480   }
1481   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1482   msg->header.header.size = 
1483     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1484   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1485   msg->sequence_number = htonl (socket->write_sequence_number);
1486   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1487   return msg;
1488 }
1489
1490
1491 /**
1492  * Task for retransmitting control messages if they aren't ACK'ed before a
1493  * deadline
1494  *
1495  * @param cls the socket
1496  * @param tc the Task context
1497  */
1498 static void
1499 control_retransmission_task (void *cls,
1500                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1501 {
1502   struct GNUNET_STREAM_Socket *socket = cls;
1503     
1504   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1505     return;
1506   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1507   LOG_DEBUG ("%s: Retransmitting a control message\n",
1508                  GNUNET_i2s (&socket->other_peer));
1509   switch (socket->status)
1510   {
1511   case STATE_INIT:    
1512     GNUNET_break (0);
1513     break;
1514   case STATE_LISTEN:
1515     GNUNET_break (0);
1516     break;
1517   case STATE_HELLO_WAIT:
1518     if (NULL == socket->lsocket) /* We are client */
1519       queue_message (socket, generate_hello (), NULL, NULL);
1520     else
1521       queue_message (socket,
1522                      (struct GNUNET_STREAM_MessageHeader *)
1523                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
1524     socket->control_retransmission_task_id =
1525     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1526                                   &control_retransmission_task, socket);
1527     break;
1528   case STATE_ESTABLISHED:
1529     if (NULL == socket->lsocket)
1530       queue_message (socket,
1531                      (struct GNUNET_STREAM_MessageHeader *)
1532                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
1533     else
1534       GNUNET_break (0);
1535   default:
1536     GNUNET_break (0);
1537   }  
1538 }
1539
1540
1541 /**
1542  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1543  *
1544  * @param cls the socket (set from GNUNET_MESH_connect)
1545  * @param tunnel connection to the other end
1546  * @param tunnel_ctx this is NULL
1547  * @param sender who sent the message
1548  * @param message the actual message
1549  * @param atsi performance data for the connection
1550  * @return GNUNET_OK to keep the connection open,
1551  *         GNUNET_SYSERR to close it (signal serious error)
1552  */
1553 static int
1554 client_handle_hello_ack (void *cls,
1555                          struct GNUNET_MESH_Tunnel *tunnel,
1556                          void **tunnel_ctx,
1557                          const struct GNUNET_PeerIdentity *sender,
1558                          const struct GNUNET_MessageHeader *message,
1559                          const struct GNUNET_ATS_Information*atsi)
1560 {
1561   struct GNUNET_STREAM_Socket *socket = cls;
1562   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1563   struct GNUNET_STREAM_HelloAckMessage *reply;
1564
1565   if (0 != memcmp (sender,
1566                    &socket->other_peer,
1567                    sizeof (struct GNUNET_PeerIdentity)))
1568   {
1569     LOG (GNUNET_ERROR_TYPE_DEBUG,
1570          "%s: Received HELLO_ACK from non-confirming peer\n",
1571          GNUNET_i2s (&socket->other_peer));
1572     return GNUNET_YES;
1573   }
1574   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1575   LOG (GNUNET_ERROR_TYPE_DEBUG,
1576        "%s: Received HELLO_ACK from %s\n",
1577        GNUNET_i2s (&socket->other_peer),
1578        GNUNET_i2s (&socket->other_peer));
1579
1580   GNUNET_assert (socket->tunnel == tunnel);
1581   switch (socket->state)
1582   {
1583   case STATE_HELLO_WAIT:
1584     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1585     LOG (GNUNET_ERROR_TYPE_DEBUG,
1586          "%s: Read sequence number %u\n",
1587          GNUNET_i2s (&socket->other_peer),
1588          (unsigned int) socket->read_sequence_number);
1589     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1590     reply = generate_hello_ack (socket, GNUNET_YES);
1591     queue_message (socket,
1592                    &reply->header, 
1593                    &set_state_established,
1594                    NULL);    
1595     return GNUNET_OK;
1596   case STATE_ESTABLISHED:
1597     // call statistics (# ACKs ignored++)
1598     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1599                    socket->control_retransmission_task_id);
1600     socket->control_retransmission_task_id =
1601       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1602     return GNUNET_OK;
1603   default:
1604     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1605                GNUNET_i2s (&socket->other_peer),
1606                GNUNET_i2s (&socket->other_peer),
1607                socket->state);
1608     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1609     return GNUNET_SYSERR;
1610   }
1611 }
1612
1613
1614 /**
1615  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1616  *
1617  * @param cls the socket (set from GNUNET_MESH_connect)
1618  * @param tunnel connection to the other end
1619  * @param tunnel_ctx this is NULL
1620  * @param sender who sent the message
1621  * @param message the actual message
1622  * @param atsi performance data for the connection
1623  * @return GNUNET_OK to keep the connection open,
1624  *         GNUNET_SYSERR to close it (signal serious error)
1625  */
1626 static int
1627 client_handle_reset (void *cls,
1628                      struct GNUNET_MESH_Tunnel *tunnel,
1629                      void **tunnel_ctx,
1630                      const struct GNUNET_PeerIdentity *sender,
1631                      const struct GNUNET_MessageHeader *message,
1632                      const struct GNUNET_ATS_Information*atsi)
1633 {
1634   // struct GNUNET_STREAM_Socket *socket = cls;
1635
1636   return GNUNET_OK;
1637 }
1638
1639
1640 /**
1641  * Common message handler for handling TRANSMIT_CLOSE messages
1642  *
1643  * @param socket the socket through which the ack was received
1644  * @param tunnel connection to the other end
1645  * @param sender who sent the message
1646  * @param msg the transmit close message
1647  * @param atsi performance data for the connection
1648  * @return GNUNET_OK to keep the connection open,
1649  *         GNUNET_SYSERR to close it (signal serious error)
1650  */
1651 static int
1652 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1653                        struct GNUNET_MESH_Tunnel *tunnel,
1654                        const struct GNUNET_PeerIdentity *sender,
1655                        const struct GNUNET_STREAM_MessageHeader *msg,
1656                        const struct GNUNET_ATS_Information*atsi)
1657 {
1658   struct GNUNET_STREAM_MessageHeader *reply;
1659
1660   switch (socket->state)
1661   {
1662   case STATE_ESTABLISHED:
1663     socket->state = STATE_RECEIVE_CLOSED;
1664
1665     /* Send TRANSMIT_CLOSE_ACK */
1666     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1667     reply->header.type = 
1668       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1669     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1670     queue_message (socket, reply, NULL, NULL);
1671     break;
1672
1673   default:
1674     /* FIXME: Call statistics? */
1675     break;
1676   }
1677   return GNUNET_YES;
1678 }
1679
1680
1681 /**
1682  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1683  *
1684  * @param cls the socket (set from GNUNET_MESH_connect)
1685  * @param tunnel connection to the other end
1686  * @param tunnel_ctx this is NULL
1687  * @param sender who sent the message
1688  * @param message the actual message
1689  * @param atsi performance data for the connection
1690  * @return GNUNET_OK to keep the connection open,
1691  *         GNUNET_SYSERR to close it (signal serious error)
1692  */
1693 static int
1694 client_handle_transmit_close (void *cls,
1695                               struct GNUNET_MESH_Tunnel *tunnel,
1696                               void **tunnel_ctx,
1697                               const struct GNUNET_PeerIdentity *sender,
1698                               const struct GNUNET_MessageHeader *message,
1699                               const struct GNUNET_ATS_Information*atsi)
1700 {
1701   struct GNUNET_STREAM_Socket *socket = cls;
1702   
1703   return handle_transmit_close (socket,
1704                                 tunnel,
1705                                 sender,
1706                                 (struct GNUNET_STREAM_MessageHeader *)message,
1707                                 atsi);
1708 }
1709
1710
1711 /**
1712  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1713  *
1714  * @param socket the socket
1715  * @param tunnel connection to the other end
1716  * @param sender who sent the message
1717  * @param message the actual message
1718  * @param atsi performance data for the connection
1719  * @param operation the close operation which is being ACK'ed
1720  * @return GNUNET_OK to keep the connection open,
1721  *         GNUNET_SYSERR to close it (signal serious error)
1722  */
1723 static int
1724 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1725                           struct GNUNET_MESH_Tunnel *tunnel,
1726                           const struct GNUNET_PeerIdentity *sender,
1727                           const struct GNUNET_STREAM_MessageHeader *message,
1728                           const struct GNUNET_ATS_Information *atsi,
1729                           int operation)
1730 {
1731   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1732
1733   shutdown_handle = socket->shutdown_handle;
1734   if (NULL == shutdown_handle)
1735   {
1736     LOG (GNUNET_ERROR_TYPE_DEBUG,
1737          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1738          GNUNET_i2s (&socket->other_peer));
1739     return GNUNET_OK;
1740   }
1741
1742   switch (operation)
1743   {
1744   case SHUT_RDWR:
1745     switch (socket->state)
1746     {
1747     case STATE_CLOSE_WAIT:
1748       if (SHUT_RDWR != shutdown_handle->operation)
1749       {
1750         LOG (GNUNET_ERROR_TYPE_DEBUG,
1751              "%s: Received CLOSE_ACK when shutdown handle is not for "
1752              "SHUT_RDWR\n",
1753              GNUNET_i2s (&socket->other_peer));
1754         return GNUNET_OK;
1755       }
1756
1757       LOG (GNUNET_ERROR_TYPE_DEBUG,
1758            "%s: Received CLOSE_ACK from %s\n",
1759            GNUNET_i2s (&socket->other_peer),
1760            GNUNET_i2s (&socket->other_peer));
1761       socket->state = STATE_CLOSED;
1762       break;
1763     default:
1764       LOG (GNUNET_ERROR_TYPE_DEBUG,
1765            "%s: Received CLOSE_ACK when in it not expected\n",
1766            GNUNET_i2s (&socket->other_peer));
1767       return GNUNET_OK;
1768     }
1769     break;
1770
1771   case SHUT_RD:
1772     switch (socket->state)
1773     {
1774     case STATE_RECEIVE_CLOSE_WAIT:
1775       if (SHUT_RD != shutdown_handle->operation)
1776       {
1777         LOG (GNUNET_ERROR_TYPE_DEBUG,
1778              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1779              "is not for SHUT_RD\n",
1780              GNUNET_i2s (&socket->other_peer));
1781         return GNUNET_OK;
1782       }
1783
1784       LOG (GNUNET_ERROR_TYPE_DEBUG,
1785            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1786            GNUNET_i2s (&socket->other_peer),
1787            GNUNET_i2s (&socket->other_peer));
1788       socket->state = STATE_RECEIVE_CLOSED;
1789       break;
1790     default:
1791       LOG (GNUNET_ERROR_TYPE_DEBUG,
1792            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1793            GNUNET_i2s (&socket->other_peer));
1794       return GNUNET_OK;
1795     }
1796
1797     break;
1798   case SHUT_WR:
1799     switch (socket->state)
1800     {
1801     case STATE_TRANSMIT_CLOSE_WAIT:
1802       if (SHUT_WR != shutdown_handle->operation)
1803       {
1804         LOG (GNUNET_ERROR_TYPE_DEBUG,
1805              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1806              "is not for SHUT_WR\n",
1807              GNUNET_i2s (&socket->other_peer));
1808         return GNUNET_OK;
1809       }
1810
1811       LOG (GNUNET_ERROR_TYPE_DEBUG,
1812            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1813            GNUNET_i2s (&socket->other_peer),
1814            GNUNET_i2s (&socket->other_peer));
1815       socket->state = STATE_TRANSMIT_CLOSED;
1816       break;
1817     default:
1818       LOG (GNUNET_ERROR_TYPE_DEBUG,
1819            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1820            GNUNET_i2s (&socket->other_peer));
1821           
1822       return GNUNET_OK;
1823     }
1824     break;
1825   default:
1826     GNUNET_assert (0);
1827   }
1828
1829   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1830     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1831                                    operation);
1832   if (GNUNET_SCHEDULER_NO_TASK
1833       != shutdown_handle->close_msg_retransmission_task_id)
1834   {
1835     GNUNET_SCHEDULER_cancel
1836       (shutdown_handle->close_msg_retransmission_task_id);
1837     shutdown_handle->close_msg_retransmission_task_id =
1838       GNUNET_SCHEDULER_NO_TASK;
1839   }
1840   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1841   socket->shutdown_handle = NULL;
1842   return GNUNET_OK;
1843 }
1844
1845
1846 /**
1847  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1848  *
1849  * @param cls the socket (set from GNUNET_MESH_connect)
1850  * @param tunnel connection to the other end
1851  * @param tunnel_ctx this is NULL
1852  * @param sender who sent the message
1853  * @param message the actual message
1854  * @param atsi performance data for the connection
1855  * @return GNUNET_OK to keep the connection open,
1856  *         GNUNET_SYSERR to close it (signal serious error)
1857  */
1858 static int
1859 client_handle_transmit_close_ack (void *cls,
1860                                   struct GNUNET_MESH_Tunnel *tunnel,
1861                                   void **tunnel_ctx,
1862                                   const struct GNUNET_PeerIdentity *sender,
1863                                   const struct GNUNET_MessageHeader *message,
1864                                   const struct GNUNET_ATS_Information*atsi)
1865 {
1866   struct GNUNET_STREAM_Socket *socket = cls;
1867
1868   return handle_generic_close_ack (socket,
1869                                    tunnel,
1870                                    sender,
1871                                    (const struct GNUNET_STREAM_MessageHeader *)
1872                                    message,
1873                                    atsi,
1874                                    SHUT_WR);
1875 }
1876
1877
1878 /**
1879  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1880  *
1881  * @param socket the socket
1882  * @param tunnel connection to the other end
1883  * @param sender who sent the message
1884  * @param message the actual message
1885  * @param atsi performance data for the connection
1886  * @return GNUNET_OK to keep the connection open,
1887  *         GNUNET_SYSERR to close it (signal serious error)
1888  */
1889 static int
1890 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1891                       struct GNUNET_MESH_Tunnel *tunnel,
1892                       const struct GNUNET_PeerIdentity *sender,
1893                       const struct GNUNET_STREAM_MessageHeader *message,
1894                       const struct GNUNET_ATS_Information *atsi)
1895 {
1896   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1897
1898   switch (socket->state)
1899   {
1900   case STATE_INIT:
1901   case STATE_LISTEN:
1902   case STATE_HELLO_WAIT:
1903     LOG (GNUNET_ERROR_TYPE_DEBUG,
1904          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1905          GNUNET_i2s (&socket->other_peer));
1906     return GNUNET_OK;
1907   default:
1908     break;
1909   }
1910   
1911   LOG (GNUNET_ERROR_TYPE_DEBUG,
1912        "%s: Received RECEIVE_CLOSE from %s\n",
1913        GNUNET_i2s (&socket->other_peer),
1914        GNUNET_i2s (&socket->other_peer));
1915   receive_close_ack =
1916     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1917   receive_close_ack->header.size =
1918     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1919   receive_close_ack->header.type =
1920     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1921   queue_message (socket,
1922                  receive_close_ack,
1923                  &set_state_closed,
1924                  NULL);
1925   
1926   /* FIXME: Handle the case where write handle is present; the write operation
1927      should be deemed as finised and the write continuation callback
1928      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1929   return GNUNET_OK;
1930 }
1931
1932
1933 /**
1934  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1935  *
1936  * @param cls the socket (set from GNUNET_MESH_connect)
1937  * @param tunnel connection to the other end
1938  * @param tunnel_ctx this is NULL
1939  * @param sender who sent the message
1940  * @param message the actual message
1941  * @param atsi performance data for the connection
1942  * @return GNUNET_OK to keep the connection open,
1943  *         GNUNET_SYSERR to close it (signal serious error)
1944  */
1945 static int
1946 client_handle_receive_close (void *cls,
1947                              struct GNUNET_MESH_Tunnel *tunnel,
1948                              void **tunnel_ctx,
1949                              const struct GNUNET_PeerIdentity *sender,
1950                              const struct GNUNET_MessageHeader *message,
1951                              const struct GNUNET_ATS_Information*atsi)
1952 {
1953   struct GNUNET_STREAM_Socket *socket = cls;
1954
1955   return
1956     handle_receive_close (socket,
1957                           tunnel,
1958                           sender,
1959                           (const struct GNUNET_STREAM_MessageHeader *) message,
1960                           atsi);
1961 }
1962
1963
1964 /**
1965  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1966  *
1967  * @param cls the socket (set from GNUNET_MESH_connect)
1968  * @param tunnel connection to the other end
1969  * @param tunnel_ctx this is NULL
1970  * @param sender who sent the message
1971  * @param message the actual message
1972  * @param atsi performance data for the connection
1973  * @return GNUNET_OK to keep the connection open,
1974  *         GNUNET_SYSERR to close it (signal serious error)
1975  */
1976 static int
1977 client_handle_receive_close_ack (void *cls,
1978                                  struct GNUNET_MESH_Tunnel *tunnel,
1979                                  void **tunnel_ctx,
1980                                  const struct GNUNET_PeerIdentity *sender,
1981                                  const struct GNUNET_MessageHeader *message,
1982                                  const struct GNUNET_ATS_Information*atsi)
1983 {
1984   struct GNUNET_STREAM_Socket *socket = cls;
1985
1986   return handle_generic_close_ack (socket,
1987                                    tunnel,
1988                                    sender,
1989                                    (const struct GNUNET_STREAM_MessageHeader *)
1990                                    message,
1991                                    atsi,
1992                                    SHUT_RD);
1993 }
1994
1995
1996 /**
1997  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1998  *
1999  * @param socket the socket
2000  * @param tunnel connection to the other end
2001  * @param sender who sent the message
2002  * @param message the actual message
2003  * @param atsi performance data for the connection
2004  * @return GNUNET_OK to keep the connection open,
2005  *         GNUNET_SYSERR to close it (signal serious error)
2006  */
2007 static int
2008 handle_close (struct GNUNET_STREAM_Socket *socket,
2009               struct GNUNET_MESH_Tunnel *tunnel,
2010               const struct GNUNET_PeerIdentity *sender,
2011               const struct GNUNET_STREAM_MessageHeader *message,
2012               const struct GNUNET_ATS_Information*atsi)
2013 {
2014   struct GNUNET_STREAM_MessageHeader *close_ack;
2015
2016   switch (socket->state)
2017   {
2018   case STATE_INIT:
2019   case STATE_LISTEN:
2020   case STATE_HELLO_WAIT:
2021     LOG (GNUNET_ERROR_TYPE_DEBUG,
2022          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2023          GNUNET_i2s (&socket->other_peer));
2024     return GNUNET_OK;
2025   default:
2026     break;
2027   }
2028
2029   LOG (GNUNET_ERROR_TYPE_DEBUG,
2030        "%s: Received CLOSE from %s\n",
2031        GNUNET_i2s (&socket->other_peer),
2032        GNUNET_i2s (&socket->other_peer));
2033   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2034   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2035   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2036   queue_message (socket,
2037                  close_ack,
2038                  &set_state_closed,
2039                  NULL);
2040   if (socket->state == STATE_CLOSED)
2041     return GNUNET_OK;
2042
2043   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2044   socket->receive_buffer = NULL;
2045   socket->receive_buffer_size = 0;
2046   return GNUNET_OK;
2047 }
2048
2049
2050 /**
2051  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2052  *
2053  * @param cls the socket (set from GNUNET_MESH_connect)
2054  * @param tunnel connection to the other end
2055  * @param tunnel_ctx this is NULL
2056  * @param sender who sent the message
2057  * @param message the actual message
2058  * @param atsi performance data for the connection
2059  * @return GNUNET_OK to keep the connection open,
2060  *         GNUNET_SYSERR to close it (signal serious error)
2061  */
2062 static int
2063 client_handle_close (void *cls,
2064                      struct GNUNET_MESH_Tunnel *tunnel,
2065                      void **tunnel_ctx,
2066                      const struct GNUNET_PeerIdentity *sender,
2067                      const struct GNUNET_MessageHeader *message,
2068                      const struct GNUNET_ATS_Information*atsi)
2069 {
2070   struct GNUNET_STREAM_Socket *socket = cls;
2071
2072   return handle_close (socket,
2073                        tunnel,
2074                        sender,
2075                        (const struct GNUNET_STREAM_MessageHeader *) message,
2076                        atsi);
2077 }
2078
2079
2080 /**
2081  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2082  *
2083  * @param cls the socket (set from GNUNET_MESH_connect)
2084  * @param tunnel connection to the other end
2085  * @param tunnel_ctx this is NULL
2086  * @param sender who sent the message
2087  * @param message the actual message
2088  * @param atsi performance data for the connection
2089  * @return GNUNET_OK to keep the connection open,
2090  *         GNUNET_SYSERR to close it (signal serious error)
2091  */
2092 static int
2093 client_handle_close_ack (void *cls,
2094                          struct GNUNET_MESH_Tunnel *tunnel,
2095                          void **tunnel_ctx,
2096                          const struct GNUNET_PeerIdentity *sender,
2097                          const struct GNUNET_MessageHeader *message,
2098                          const struct GNUNET_ATS_Information *atsi)
2099 {
2100   struct GNUNET_STREAM_Socket *socket = cls;
2101
2102   return handle_generic_close_ack (socket,
2103                                    tunnel,
2104                                    sender,
2105                                    (const struct GNUNET_STREAM_MessageHeader *) 
2106                                    message,
2107                                    atsi,
2108                                    SHUT_RDWR);
2109 }
2110
2111 /*****************************/
2112 /* Server's Message Handlers */
2113 /*****************************/
2114
2115 /**
2116  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2117  *
2118  * @param cls the closure
2119  * @param tunnel connection to the other end
2120  * @param tunnel_ctx the socket
2121  * @param sender who sent the message
2122  * @param message the actual message
2123  * @param atsi performance data for the connection
2124  * @return GNUNET_OK to keep the connection open,
2125  *         GNUNET_SYSERR to close it (signal serious error)
2126  */
2127 static int
2128 server_handle_data (void *cls,
2129                     struct GNUNET_MESH_Tunnel *tunnel,
2130                     void **tunnel_ctx,
2131                     const struct GNUNET_PeerIdentity *sender,
2132                     const struct GNUNET_MessageHeader *message,
2133                     const struct GNUNET_ATS_Information*atsi)
2134 {
2135   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2136
2137   return handle_data (socket,
2138                       tunnel,
2139                       sender,
2140                       (const struct GNUNET_STREAM_DataMessage *)message,
2141                       atsi);
2142 }
2143
2144
2145 /**
2146  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2147  *
2148  * @param cls the closure
2149  * @param tunnel connection to the other end
2150  * @param tunnel_ctx the socket
2151  * @param sender who sent the message
2152  * @param message the actual message
2153  * @param atsi performance data for the connection
2154  * @return GNUNET_OK to keep the connection open,
2155  *         GNUNET_SYSERR to close it (signal serious error)
2156  */
2157 static int
2158 server_handle_hello (void *cls,
2159                      struct GNUNET_MESH_Tunnel *tunnel,
2160                      void **tunnel_ctx,
2161                      const struct GNUNET_PeerIdentity *sender,
2162                      const struct GNUNET_MessageHeader *message,
2163                      const struct GNUNET_ATS_Information*atsi)
2164 {
2165   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2166   struct GNUNET_STREAM_HelloAckMessage *reply;
2167
2168   if (0 != memcmp (sender,
2169                    &socket->other_peer,
2170                    sizeof (struct GNUNET_PeerIdentity)))
2171   {
2172     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2173                GNUNET_i2s (&socket->other_peer));
2174     return GNUNET_YES;
2175   }
2176   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2177   GNUNET_assert (socket->tunnel == tunnel);
2178   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2179              GNUNET_i2s (&socket->other_peer));
2180   switch (socket->status)
2181   {
2182   case STATE_INIT:
2183     reply = generate_hello_ack (socket, GNUNET_YES);
2184     queue_message (socket, &reply->header, &set_state_hello_wait, NULL);
2185     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2186                    socket->control_retransmission_task_id);
2187     socket->control_retransmission_task_id =
2188       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2189                                     &control_retransmission_task, socket);
2190     break;
2191   case STATE_HELLO_WAIT:
2192     /* Perhaps our HELLO_ACK was lost */
2193     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2194                    socket->control_retransmission_task_id);
2195     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2196     socket->control_retransmission_task_id =
2197       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2198     break;
2199   default:
2200     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2201                GNUNET_i2s (&socket->other_peer), socket->state);
2202     /* FIXME: Send RESET? */
2203   }
2204   return GNUNET_OK;
2205 }
2206
2207
2208 /**
2209  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2210  *
2211  * @param cls the closure
2212  * @param tunnel connection to the other end
2213  * @param tunnel_ctx the socket
2214  * @param sender who sent the message
2215  * @param message the actual message
2216  * @param atsi performance data for the connection
2217  * @return GNUNET_OK to keep the connection open,
2218  *         GNUNET_SYSERR to close it (signal serious error)
2219  */
2220 static int
2221 server_handle_hello_ack (void *cls,
2222                          struct GNUNET_MESH_Tunnel *tunnel,
2223                          void **tunnel_ctx,
2224                          const struct GNUNET_PeerIdentity *sender,
2225                          const struct GNUNET_MessageHeader *message,
2226                          const struct GNUNET_ATS_Information*atsi)
2227 {
2228   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2229   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2230
2231   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2232                  ntohs (message->type));
2233   GNUNET_assert (socket->tunnel == tunnel);
2234   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2235   switch (socket->state)  
2236   {
2237   case STATE_HELLO_WAIT:
2238     LOG (GNUNET_ERROR_TYPE_DEBUG,
2239          "%s: Received HELLO_ACK from %s\n",
2240          GNUNET_i2s (&socket->other_peer),
2241          GNUNET_i2s (&socket->other_peer));
2242     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2243     LOG (GNUNET_ERROR_TYPE_DEBUG,
2244          "%s: Read sequence number %u\n",
2245          GNUNET_i2s (&socket->other_peer),
2246          (unsigned int) socket->read_sequence_number);
2247     socket->receiver_window_available = 
2248       ntohl (ack_message->receiver_window_size);
2249     set_state_established (NULL, socket);
2250     break;
2251   default:
2252     LOG (GNUNET_ERROR_TYPE_DEBUG,
2253          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2254   }
2255   return GNUNET_OK;
2256 }
2257
2258
2259 /**
2260  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2261  *
2262  * @param cls the closure
2263  * @param tunnel connection to the other end
2264  * @param tunnel_ctx the socket
2265  * @param sender who sent the message
2266  * @param message the actual message
2267  * @param atsi performance data for the connection
2268  * @return GNUNET_OK to keep the connection open,
2269  *         GNUNET_SYSERR to close it (signal serious error)
2270  */
2271 static int
2272 server_handle_reset (void *cls,
2273                      struct GNUNET_MESH_Tunnel *tunnel,
2274                      void **tunnel_ctx,
2275                      const struct GNUNET_PeerIdentity *sender,
2276                      const struct GNUNET_MessageHeader *message,
2277                      const struct GNUNET_ATS_Information*atsi)
2278 {
2279   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2280
2281   return GNUNET_OK;
2282 }
2283
2284
2285 /**
2286  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2287  *
2288  * @param cls the closure
2289  * @param tunnel connection to the other end
2290  * @param tunnel_ctx the socket
2291  * @param sender who sent the message
2292  * @param message the actual message
2293  * @param atsi performance data for the connection
2294  * @return GNUNET_OK to keep the connection open,
2295  *         GNUNET_SYSERR to close it (signal serious error)
2296  */
2297 static int
2298 server_handle_transmit_close (void *cls,
2299                               struct GNUNET_MESH_Tunnel *tunnel,
2300                               void **tunnel_ctx,
2301                               const struct GNUNET_PeerIdentity *sender,
2302                               const struct GNUNET_MessageHeader *message,
2303                               const struct GNUNET_ATS_Information*atsi)
2304 {
2305   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2306
2307   return handle_transmit_close (socket,
2308                                 tunnel,
2309                                 sender,
2310                                 (struct GNUNET_STREAM_MessageHeader *)message,
2311                                 atsi);
2312 }
2313
2314
2315 /**
2316  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2317  *
2318  * @param cls the closure
2319  * @param tunnel connection to the other end
2320  * @param tunnel_ctx the socket
2321  * @param sender who sent the message
2322  * @param message the actual message
2323  * @param atsi performance data for the connection
2324  * @return GNUNET_OK to keep the connection open,
2325  *         GNUNET_SYSERR to close it (signal serious error)
2326  */
2327 static int
2328 server_handle_transmit_close_ack (void *cls,
2329                                   struct GNUNET_MESH_Tunnel *tunnel,
2330                                   void **tunnel_ctx,
2331                                   const struct GNUNET_PeerIdentity *sender,
2332                                   const struct GNUNET_MessageHeader *message,
2333                                   const struct GNUNET_ATS_Information*atsi)
2334 {
2335   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2336
2337   return handle_generic_close_ack (socket,
2338                                    tunnel,
2339                                    sender,
2340                                    (const struct GNUNET_STREAM_MessageHeader *)
2341                                    message,
2342                                    atsi,
2343                                    SHUT_WR);
2344 }
2345
2346
2347 /**
2348  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2349  *
2350  * @param cls the closure
2351  * @param tunnel connection to the other end
2352  * @param tunnel_ctx the socket
2353  * @param sender who sent the message
2354  * @param message the actual message
2355  * @param atsi performance data for the connection
2356  * @return GNUNET_OK to keep the connection open,
2357  *         GNUNET_SYSERR to close it (signal serious error)
2358  */
2359 static int
2360 server_handle_receive_close (void *cls,
2361                              struct GNUNET_MESH_Tunnel *tunnel,
2362                              void **tunnel_ctx,
2363                              const struct GNUNET_PeerIdentity *sender,
2364                              const struct GNUNET_MessageHeader *message,
2365                              const struct GNUNET_ATS_Information*atsi)
2366 {
2367   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2368
2369   return
2370     handle_receive_close (socket,
2371                           tunnel,
2372                           sender,
2373                           (const struct GNUNET_STREAM_MessageHeader *) message,
2374                           atsi);
2375 }
2376
2377
2378 /**
2379  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2380  *
2381  * @param cls the closure
2382  * @param tunnel connection to the other end
2383  * @param tunnel_ctx the socket
2384  * @param sender who sent the message
2385  * @param message the actual message
2386  * @param atsi performance data for the connection
2387  * @return GNUNET_OK to keep the connection open,
2388  *         GNUNET_SYSERR to close it (signal serious error)
2389  */
2390 static int
2391 server_handle_receive_close_ack (void *cls,
2392                                  struct GNUNET_MESH_Tunnel *tunnel,
2393                                  void **tunnel_ctx,
2394                                  const struct GNUNET_PeerIdentity *sender,
2395                                  const struct GNUNET_MessageHeader *message,
2396                                  const struct GNUNET_ATS_Information*atsi)
2397 {
2398   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2399
2400   return handle_generic_close_ack (socket,
2401                                    tunnel,
2402                                    sender,
2403                                    (const struct GNUNET_STREAM_MessageHeader *)
2404                                    message,
2405                                    atsi,
2406                                    SHUT_RD);
2407 }
2408
2409
2410 /**
2411  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2412  *
2413  * @param cls the listen socket (from GNUNET_MESH_connect in
2414  *          GNUNET_STREAM_listen) 
2415  * @param tunnel connection to the other end
2416  * @param tunnel_ctx the socket
2417  * @param sender who sent the message
2418  * @param message the actual message
2419  * @param atsi performance data for the connection
2420  * @return GNUNET_OK to keep the connection open,
2421  *         GNUNET_SYSERR to close it (signal serious error)
2422  */
2423 static int
2424 server_handle_close (void *cls,
2425                      struct GNUNET_MESH_Tunnel *tunnel,
2426                      void **tunnel_ctx,
2427                      const struct GNUNET_PeerIdentity *sender,
2428                      const struct GNUNET_MessageHeader *message,
2429                      const struct GNUNET_ATS_Information*atsi)
2430 {
2431   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2432   
2433   return handle_close (socket,
2434                        tunnel,
2435                        sender,
2436                        (const struct GNUNET_STREAM_MessageHeader *) message,
2437                        atsi);
2438 }
2439
2440
2441 /**
2442  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2443  *
2444  * @param cls the closure
2445  * @param tunnel connection to the other end
2446  * @param tunnel_ctx the socket
2447  * @param sender who sent the message
2448  * @param message the actual message
2449  * @param atsi performance data for the connection
2450  * @return GNUNET_OK to keep the connection open,
2451  *         GNUNET_SYSERR to close it (signal serious error)
2452  */
2453 static int
2454 server_handle_close_ack (void *cls,
2455                          struct GNUNET_MESH_Tunnel *tunnel,
2456                          void **tunnel_ctx,
2457                          const struct GNUNET_PeerIdentity *sender,
2458                          const struct GNUNET_MessageHeader *message,
2459                          const struct GNUNET_ATS_Information*atsi)
2460 {
2461   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2462
2463   return handle_generic_close_ack (socket,
2464                                    tunnel,
2465                                    sender,
2466                                    (const struct GNUNET_STREAM_MessageHeader *) 
2467                                    message,
2468                                    atsi,
2469                                    SHUT_RDWR);
2470 }
2471
2472
2473 /**
2474  * Handler for DATA_ACK messages
2475  *
2476  * @param socket the socket through which the ack was received
2477  * @param tunnel connection to the other end
2478  * @param sender who sent the message
2479  * @param ack the acknowledgment message
2480  * @param atsi performance data for the connection
2481  * @return GNUNET_OK to keep the connection open,
2482  *         GNUNET_SYSERR to close it (signal serious error)
2483  */
2484 static int
2485 handle_ack (struct GNUNET_STREAM_Socket *socket,
2486             struct GNUNET_MESH_Tunnel *tunnel,
2487             const struct GNUNET_PeerIdentity *sender,
2488             const struct GNUNET_STREAM_AckMessage *ack,
2489             const struct GNUNET_ATS_Information*atsi)
2490 {
2491   unsigned int packet;
2492   int need_retransmission;
2493   uint32_t sequence_difference;
2494   
2495   if (0 != memcmp (sender,
2496                    &socket->other_peer,
2497                    sizeof (struct GNUNET_PeerIdentity)))
2498   {
2499     LOG (GNUNET_ERROR_TYPE_DEBUG,
2500          "%s: Received ACK from non-confirming peer\n",
2501          GNUNET_i2s (&socket->other_peer));
2502     return GNUNET_YES;
2503   }
2504   switch (socket->state)
2505   {
2506   case (STATE_ESTABLISHED):
2507   case (STATE_RECEIVE_CLOSED):
2508   case (STATE_RECEIVE_CLOSE_WAIT):
2509     if (NULL == socket->write_handle)
2510     {
2511       LOG (GNUNET_ERROR_TYPE_DEBUG,
2512            "%s: Received DATA_ACK when write_handle is NULL\n",
2513            GNUNET_i2s (&socket->other_peer));
2514       return GNUNET_OK;
2515     }
2516     if (!((socket->write_sequence_number 
2517            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2518     {
2519       LOG (GNUNET_ERROR_TYPE_DEBUG,
2520            "%s: Received DATA_ACK with unexpected base sequence number\n",
2521            GNUNET_i2s (&socket->other_peer));
2522       LOG (GNUNET_ERROR_TYPE_DEBUG,
2523            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2524            GNUNET_i2s (&socket->other_peer),
2525            socket->write_sequence_number,
2526            ntohl (ack->base_sequence_number));
2527       return GNUNET_OK;
2528     }
2529     /* FIXME: include the case when write_handle is cancelled - ignore the 
2530        acks */
2531     LOG (GNUNET_ERROR_TYPE_DEBUG,
2532          "%s: Received DATA_ACK from %s\n",
2533          GNUNET_i2s (&socket->other_peer),
2534          GNUNET_i2s (&socket->other_peer));
2535       
2536     /* Cancel the retransmission task */
2537     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2538     {
2539       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2540       socket->data_retransmission_task_id = 
2541         GNUNET_SCHEDULER_NO_TASK;
2542     }
2543     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2544     {
2545       if (NULL == socket->write_handle->messages[packet]) break;
2546       /* BS: Base sequence from ack; PS: sequence num of current packet */
2547       sequence_difference = ntohl (ack->base_sequence_number)
2548         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2549       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2550       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2551           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2552               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2553       {
2554         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2555                               GNUNET_YES);
2556         continue;
2557       }
2558       if (GNUNET_YES == 
2559           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2560                                 -sequence_difference))/*inversion as PS >= BS */
2561       {
2562         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2563                               GNUNET_YES);
2564       }
2565     }
2566     /* Update the receive window remaining
2567        FIXME : Should update with the value from a data ack with greater
2568        sequence number */
2569     socket->receiver_window_available = 
2570       ntohl (ack->receive_window_remaining);
2571     /* Check if we have received all acknowledgements */
2572     need_retransmission = GNUNET_NO;
2573     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2574     {
2575       if (NULL == socket->write_handle->messages[packet]) break;
2576       if (GNUNET_YES != ackbitmap_is_bit_set 
2577           (&socket->write_handle->ack_bitmap,packet))
2578       {
2579         need_retransmission = GNUNET_YES;
2580         break;
2581       }
2582     }
2583     if (GNUNET_YES == need_retransmission)
2584     {
2585       write_data (socket);
2586     }
2587     else      /* We have to call the write continuation callback now */
2588     {
2589       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2590       
2591       /* Free the packets */
2592       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2593       {
2594         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2595       }
2596       write_handle = socket->write_handle;
2597       socket->write_handle = NULL;
2598       if (NULL != write_handle->write_cont)
2599         write_handle->write_cont (write_handle->write_cont_cls,
2600                                   socket->status,
2601                                   write_handle->size);
2602       /* We are done with the write handle - Freeing it */
2603       GNUNET_free (write_handle);
2604       LOG (GNUNET_ERROR_TYPE_DEBUG,
2605            "%s: Write completion callback completed\n",
2606            GNUNET_i2s (&socket->other_peer));      
2607     }
2608     break;
2609   default:
2610     break;
2611   }
2612   return GNUNET_OK;
2613 }
2614
2615
2616 /**
2617  * Handler for DATA_ACK messages
2618  *
2619  * @param cls the 'struct GNUNET_STREAM_Socket'
2620  * @param tunnel connection to the other end
2621  * @param tunnel_ctx unused
2622  * @param sender who sent the message
2623  * @param message the actual message
2624  * @param atsi performance data for the connection
2625  * @return GNUNET_OK to keep the connection open,
2626  *         GNUNET_SYSERR to close it (signal serious error)
2627  */
2628 static int
2629 client_handle_ack (void *cls,
2630                    struct GNUNET_MESH_Tunnel *tunnel,
2631                    void **tunnel_ctx,
2632                    const struct GNUNET_PeerIdentity *sender,
2633                    const struct GNUNET_MessageHeader *message,
2634                    const struct GNUNET_ATS_Information*atsi)
2635 {
2636   struct GNUNET_STREAM_Socket *socket = cls;
2637   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2638  
2639   return handle_ack (socket, tunnel, sender, ack, atsi);
2640 }
2641
2642
2643 /**
2644  * Handler for DATA_ACK messages
2645  *
2646  * @param cls the server's listen socket
2647  * @param tunnel connection to the other end
2648  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2649  * @param sender who sent the message
2650  * @param message the actual message
2651  * @param atsi performance data for the connection
2652  * @return GNUNET_OK to keep the connection open,
2653  *         GNUNET_SYSERR to close it (signal serious error)
2654  */
2655 static int
2656 server_handle_ack (void *cls,
2657                    struct GNUNET_MESH_Tunnel *tunnel,
2658                    void **tunnel_ctx,
2659                    const struct GNUNET_PeerIdentity *sender,
2660                    const struct GNUNET_MessageHeader *message,
2661                    const struct GNUNET_ATS_Information*atsi)
2662 {
2663   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2664   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2665  
2666   return handle_ack (socket, tunnel, sender, ack, atsi);
2667 }
2668
2669
2670 /**
2671  * For client message handlers, the stream socket is in the
2672  * closure argument.
2673  */
2674 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2675   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2676   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2677    sizeof (struct GNUNET_STREAM_AckMessage) },
2678   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2679    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2680   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2681    sizeof (struct GNUNET_STREAM_MessageHeader)},
2682   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2683    sizeof (struct GNUNET_STREAM_MessageHeader)},
2684   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2685    sizeof (struct GNUNET_STREAM_MessageHeader)},
2686   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2687    sizeof (struct GNUNET_STREAM_MessageHeader)},
2688   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2689    sizeof (struct GNUNET_STREAM_MessageHeader)},
2690   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2691    sizeof (struct GNUNET_STREAM_MessageHeader)},
2692   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2693    sizeof (struct GNUNET_STREAM_MessageHeader)},
2694   {NULL, 0, 0}
2695 };
2696
2697
2698 /**
2699  * For server message handlers, the stream socket is in the
2700  * tunnel context, and the listen socket in the closure argument.
2701  */
2702 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2703   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2704   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2705    sizeof (struct GNUNET_STREAM_AckMessage) },
2706   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2707    sizeof (struct GNUNET_STREAM_MessageHeader)},
2708   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2709    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2710   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2711    sizeof (struct GNUNET_STREAM_MessageHeader)},
2712   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2713    sizeof (struct GNUNET_STREAM_MessageHeader)},
2714   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2715    sizeof (struct GNUNET_STREAM_MessageHeader)},
2716   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2717    sizeof (struct GNUNET_STREAM_MessageHeader)},
2718   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2719    sizeof (struct GNUNET_STREAM_MessageHeader)},
2720   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2721    sizeof (struct GNUNET_STREAM_MessageHeader)},
2722   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2723    sizeof (struct GNUNET_STREAM_MessageHeader)},
2724   {NULL, 0, 0}
2725 };
2726
2727
2728 /**
2729  * Function called when our target peer is connected to our tunnel
2730  *
2731  * @param cls the socket for which this tunnel is created
2732  * @param peer the peer identity of the target
2733  * @param atsi performance data for the connection
2734  */
2735 static void
2736 mesh_peer_connect_callback (void *cls,
2737                             const struct GNUNET_PeerIdentity *peer,
2738                             const struct GNUNET_ATS_Information * atsi)
2739 {
2740   struct GNUNET_STREAM_Socket *socket = cls;
2741   struct GNUNET_STREAM_MessageHeader *message;
2742   
2743   if (0 != memcmp (peer,
2744                    &socket->other_peer,
2745                    sizeof (struct GNUNET_PeerIdentity)))
2746   {
2747     LOG (GNUNET_ERROR_TYPE_DEBUG,
2748          "%s: A peer which is not our target has connected to our tunnel\n",
2749          GNUNET_i2s(peer));
2750     return;
2751   }
2752   LOG (GNUNET_ERROR_TYPE_DEBUG,
2753        "%s: Target peer %s connected\n",
2754        GNUNET_i2s (&socket->other_peer),
2755        GNUNET_i2s (&socket->other_peer));
2756   /* Set state to INIT */
2757   socket->state = STATE_INIT;
2758   /* Send HELLO message */
2759   message = generate_hello ();
2760   queue_message (socket,
2761                  message,
2762                  &set_state_hello_wait,
2763                  NULL);
2764   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2765                  socket->control_retransmission_task_id);
2766   socket->control_retransmission_task_id =
2767     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2768                                   &control_retransmission_task, socket);
2769 }
2770
2771
2772 /**
2773  * Function called when our target peer is disconnected from our tunnel
2774  *
2775  * @param cls the socket associated which this tunnel
2776  * @param peer the peer identity of the target
2777  */
2778 static void
2779 mesh_peer_disconnect_callback (void *cls,
2780                                const struct GNUNET_PeerIdentity *peer)
2781 {
2782   struct GNUNET_STREAM_Socket *socket=cls;
2783   
2784   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2785   LOG (GNUNET_ERROR_TYPE_DEBUG,
2786        "%s: Other peer %s disconnected \n",
2787        GNUNET_i2s (&socket->other_peer),
2788        GNUNET_i2s (&socket->other_peer));
2789 }
2790
2791
2792 /**
2793  * Method called whenever a peer creates a tunnel to us
2794  *
2795  * @param cls closure
2796  * @param tunnel new handle to the tunnel
2797  * @param initiator peer that started the tunnel
2798  * @param atsi performance information for the tunnel
2799  * @return initial tunnel context for the tunnel
2800  *         (can be NULL -- that's not an error)
2801  */
2802 static void *
2803 new_tunnel_notify (void *cls,
2804                    struct GNUNET_MESH_Tunnel *tunnel,
2805                    const struct GNUNET_PeerIdentity *initiator,
2806                    const struct GNUNET_ATS_Information *atsi)
2807 {
2808   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2809   struct GNUNET_STREAM_Socket *socket;
2810
2811   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2812      from the same peer again until the socket is closed */
2813
2814   if (GNUNET_NO == lsocket->listening)
2815   {
2816     LOG (GNUNET_ERROR_TYPE_DEBUG,
2817          "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2818          GNUNET_i2s (&socket->other_peer),
2819          GNUNET_i2s (&socket->other_peer));
2820     GNUNET_MESH_tunnel_destroy (tunnel);
2821     return NULL;
2822   }
2823   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2824   socket->other_peer = *initiator;
2825   socket->tunnel = tunnel;
2826   socket->session_id = 0;       /* FIXME */
2827   socket->state = STATE_INIT;
2828   socket->lsocket = lsocket;
2829   socket->retransmit_timeout = lsocket->retransmit_timeout;
2830   socket->testing_active = lsocket->testing_active;
2831   socket->testing_set_write_sequence_number_value =
2832     lsocket->testing_set_write_sequence_number_value;    
2833   LOG (GNUNET_ERROR_TYPE_DEBUG,
2834        "%s: Peer %s initiated tunnel to us\n", 
2835        GNUNET_i2s (&socket->other_peer),
2836        GNUNET_i2s (&socket->other_peer));
2837   /* FIXME: Copy MESH handle from lsocket to socket */
2838   return socket;
2839 }
2840
2841
2842 /**
2843  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2844  * any associated state.  This function is NOT called if the client has
2845  * explicitly asked for the tunnel to be destroyed using
2846  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2847  * the tunnel.
2848  *
2849  * @param cls closure (set from GNUNET_MESH_connect)
2850  * @param tunnel connection to the other end (henceforth invalid)
2851  * @param tunnel_ctx place where local state associated
2852  *                   with the tunnel is stored
2853  */
2854 static void 
2855 tunnel_cleaner (void *cls,
2856                 const struct GNUNET_MESH_Tunnel *tunnel,
2857                 void *tunnel_ctx)
2858 {
2859   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2860
2861   if (tunnel != socket->tunnel)
2862     return;
2863
2864   GNUNET_break_op(0);
2865   LOG (GNUNET_ERROR_TYPE_DEBUG,
2866        "%s: Peer %s has terminated connection abruptly\n",
2867        GNUNET_i2s (&socket->other_peer),
2868        GNUNET_i2s (&socket->other_peer));
2869
2870   socket->status = GNUNET_STREAM_SHUTDOWN;
2871
2872   /* Clear Transmit handles */
2873   if (NULL != socket->transmit_handle)
2874   {
2875     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2876     socket->transmit_handle = NULL;
2877   }
2878   if (NULL != socket->ack_transmit_handle)
2879   {
2880     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2881     GNUNET_free (socket->ack_msg);
2882     socket->ack_msg = NULL;
2883     socket->ack_transmit_handle = NULL;
2884   }
2885   /* Stop Tasks using socket->tunnel */
2886   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2887   {
2888     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2889     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2890   }
2891   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2892   {
2893     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2894     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2895   }
2896   /* FIXME: Cancel all other tasks using socket->tunnel */
2897   socket->tunnel = NULL;
2898 }
2899
2900
2901 /**
2902  * Callback to signal timeout on lockmanager lock acquire
2903  *
2904  * @param cls the ListenSocket
2905  * @param tc the scheduler task context
2906  */
2907 static void
2908 lockmanager_acquire_timeout (void *cls, 
2909                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2910 {
2911   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2912   GNUNET_STREAM_ListenCallback listen_cb;
2913   void *listen_cb_cls;
2914
2915   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2916   listen_cb = lsocket->listen_cb;
2917   listen_cb_cls = lsocket->listen_cb_cls;
2918   GNUNET_STREAM_listen_close (lsocket);
2919   if (NULL != listen_cb)
2920     listen_cb (listen_cb_cls, NULL, NULL);  
2921 }
2922
2923
2924 /**
2925  * Callback to notify us on the status changes on app_port lock
2926  *
2927  * @param cls the ListenSocket
2928  * @param domain the domain name of the lock
2929  * @param lock the app_port
2930  * @param status the current status of the lock
2931  */
2932 static void
2933 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2934                        enum GNUNET_LOCKMANAGER_Status status)
2935 {
2936   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2937
2938   GNUNET_assert (lock == (uint32_t) lsocket->port);
2939   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2940   {
2941     lsocket->listening = GNUNET_YES;
2942     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2943     {
2944       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2945       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2946     }
2947     if (NULL == lsocket->mesh)
2948     {
2949       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2950
2951       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2952                                            RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
2953                                            lsocket, /* Closure */
2954                                            &new_tunnel_notify,
2955                                            &tunnel_cleaner,
2956                                            server_message_handlers,
2957                                            ports);
2958       GNUNET_assert (NULL != lsocket->mesh);
2959       if (NULL != lsocket->listen_ok_cb)
2960       {
2961         (void) lsocket->listen_ok_cb ();
2962       }
2963     }
2964   }
2965   if (GNUNET_LOCKMANAGER_RELEASE == status)
2966     lsocket->listening = GNUNET_NO;
2967 }
2968
2969
2970 /*****************/
2971 /* API functions */
2972 /*****************/
2973
2974
2975 /**
2976  * Tries to open a stream to the target peer
2977  *
2978  * @param cfg configuration to use
2979  * @param target the target peer to which the stream has to be opened
2980  * @param app_port the application port number which uniquely identifies this
2981  *            stream
2982  * @param open_cb this function will be called after stream has be established;
2983  *          cannot be NULL
2984  * @param open_cb_cls the closure for open_cb
2985  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2986  * @return if successful it returns the stream socket; NULL if stream cannot be
2987  *         opened 
2988  */
2989 struct GNUNET_STREAM_Socket *
2990 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2991                     const struct GNUNET_PeerIdentity *target,
2992                     GNUNET_MESH_ApplicationType app_port,
2993                     GNUNET_STREAM_OpenCallback open_cb,
2994                     void *open_cb_cls,
2995                     ...)
2996 {
2997   struct GNUNET_STREAM_Socket *socket;
2998   enum GNUNET_STREAM_Option option;
2999   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3000   va_list vargs;
3001
3002   LOG (GNUNET_ERROR_TYPE_DEBUG,
3003        "%s\n", __func__);
3004   GNUNET_assert (NULL != open_cb);
3005   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3006   socket->other_peer = *target;
3007   socket->open_cb = open_cb;
3008   socket->open_cls = open_cb_cls;
3009   /* Set defaults */
3010   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3011   socket->testing_active = GNUNET_NO;
3012   va_start (vargs, open_cb_cls); /* Parse variable args */
3013   do {
3014     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3015     switch (option)
3016     {
3017     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3018       /* Expect struct GNUNET_TIME_Relative */
3019       socket->retransmit_timeout = va_arg (vargs,
3020                                            struct GNUNET_TIME_Relative);
3021       break;
3022     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3023       socket->testing_active = GNUNET_YES;
3024       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3025                                                                 uint32_t);
3026       break;
3027     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3028       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3029       break;
3030     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3031       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3032       break;
3033     case GNUNET_STREAM_OPTION_END:
3034       break;
3035     }
3036   } while (GNUNET_STREAM_OPTION_END != option);
3037   va_end (vargs);               /* End of variable args parsing */
3038   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3039                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
3040                                       socket, /* cls */
3041                                       NULL, /* No inbound tunnel handler */
3042                                       NULL, /* No in-tunnel cleaner */
3043                                       client_message_handlers,
3044                                       ports); /* We don't get inbound tunnels */
3045   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3046   {
3047     GNUNET_free (socket);
3048     return NULL;
3049   }
3050   /* Now create the mesh tunnel to target */
3051   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3052   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3053                                               NULL, /* Tunnel context */
3054                                               &mesh_peer_connect_callback,
3055                                               &mesh_peer_disconnect_callback,
3056                                               socket);
3057   GNUNET_assert (NULL != socket->tunnel);
3058   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3059                                         &socket->other_peer);
3060   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3061   return socket;
3062 }
3063
3064
3065 /**
3066  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3067  *
3068  * @param socket the stream socket
3069  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3070  * @param completion_cb the callback that will be called upon successful
3071  *          shutdown of given operation
3072  * @param completion_cls the closure for the completion callback
3073  * @return the shutdown handle
3074  */
3075 struct GNUNET_STREAM_ShutdownHandle *
3076 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3077                         int operation,
3078                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3079                         void *completion_cls)
3080 {
3081   struct GNUNET_STREAM_ShutdownHandle *handle;
3082   struct GNUNET_STREAM_MessageHeader *msg;
3083   
3084   GNUNET_assert (NULL == socket->shutdown_handle);
3085
3086   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3087   handle->socket = socket;
3088   handle->completion_cb = completion_cb;
3089   handle->completion_cls = completion_cls;
3090   socket->shutdown_handle = handle;
3091
3092   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3093   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3094   switch (operation)
3095   {
3096   case SHUT_RD:
3097     handle->operation = SHUT_RD;
3098     if (NULL != socket->read_handle)
3099       LOG (GNUNET_ERROR_TYPE_WARNING,
3100            "Existing read handle should be cancelled before shutting"
3101            " down reading\n");
3102     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3103     queue_message (socket,
3104                    msg,
3105                    &set_state_receive_close_wait,
3106                    NULL);
3107     break;
3108   case SHUT_WR:
3109     handle->operation = SHUT_WR;
3110     if (NULL != socket->write_handle)
3111       LOG (GNUNET_ERROR_TYPE_WARNING,
3112            "Existing write handle should be cancelled before shutting"
3113            " down writing\n");
3114     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3115     queue_message (socket,
3116                    msg,
3117                    &set_state_transmit_close_wait,
3118                    NULL);
3119     break;
3120   case SHUT_RDWR:
3121     handle->operation = SHUT_RDWR;
3122     if (NULL != socket->write_handle)
3123       LOG (GNUNET_ERROR_TYPE_WARNING,
3124            "Existing write handle should be cancelled before shutting"
3125            " down writing\n");
3126     if (NULL != socket->read_handle)
3127       LOG (GNUNET_ERROR_TYPE_WARNING,
3128            "Existing read handle should be cancelled before shutting"
3129            " down reading\n");
3130     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3131     queue_message (socket,
3132                    msg,
3133                    &set_state_close_wait,
3134                    NULL);
3135     break;
3136   default:
3137     LOG (GNUNET_ERROR_TYPE_WARNING,
3138          "GNUNET_STREAM_shutdown called with invalid value for "
3139          "parameter operation -- Ignoring\n");
3140     GNUNET_free (msg);
3141     GNUNET_free (handle);
3142     return NULL;
3143   }
3144   handle->close_msg_retransmission_task_id =
3145     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3146                                   &close_msg_retransmission_task,
3147                                   handle);
3148   return handle;
3149 }
3150
3151
3152 /**
3153  * Cancels a pending shutdown
3154  *
3155  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3156  */
3157 void
3158 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3159 {
3160   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3161     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3162   handle->socket->shutdown_handle = NULL;
3163   GNUNET_free (handle);
3164 }
3165
3166
3167 /**
3168  * Closes the stream
3169  *
3170  * @param socket the stream socket
3171  */
3172 void
3173 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3174 {
3175   struct MessageQueue *head;
3176
3177   if (NULL != socket->read_handle)
3178   {
3179     LOG (GNUNET_ERROR_TYPE_WARNING,
3180          "Closing STREAM socket when a read handle is pending\n");
3181   }
3182   if (NULL != socket->write_handle)
3183   {
3184     LOG (GNUNET_ERROR_TYPE_WARNING,
3185          "Closing STREAM socket when a write handle is pending\n");
3186     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3187     //socket->write_handle = NULL;
3188   }
3189   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3190   {
3191     /* socket closed with read task pending!? */
3192     GNUNET_break (0);
3193     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3194     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3195   }  
3196   /* Terminate the ack'ing tasks if they are still present */
3197   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3198   {
3199     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3200     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3201   }
3202   /* Terminate the control retransmission tasks */
3203   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3204   {
3205     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3206   }
3207   /* Clear Transmit handles */
3208   if (NULL != socket->transmit_handle)
3209   {
3210     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3211     socket->transmit_handle = NULL;
3212   }
3213   if (NULL != socket->ack_transmit_handle)
3214   {
3215     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3216     GNUNET_free (socket->ack_msg);
3217     socket->ack_msg = NULL;
3218     socket->ack_transmit_handle = NULL;
3219   }
3220   /* Clear existing message queue */
3221   while (NULL != (head = socket->queue_head)) {
3222     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3223                                  socket->queue_tail,
3224                                  head);
3225     GNUNET_free (head->message);
3226     GNUNET_free (head);
3227   }
3228
3229   /* Close associated tunnel */
3230   if (NULL != socket->tunnel)
3231   {
3232     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3233     socket->tunnel = NULL;
3234   }
3235
3236   /* Close mesh connection */
3237   if (NULL != socket->mesh && NULL == socket->lsocket)
3238   {
3239     GNUNET_MESH_disconnect (socket->mesh);
3240     socket->mesh = NULL;
3241   }
3242   
3243   /* Release receive buffer */
3244   if (NULL != socket->receive_buffer)
3245   {
3246     GNUNET_free (socket->receive_buffer);
3247   }
3248
3249   GNUNET_free (socket);
3250 }
3251
3252
3253 /**
3254  * Listens for stream connections for a specific application ports
3255  *
3256  * @param cfg the configuration to use
3257  * @param app_port the application port for which new streams will be accepted
3258  * @param listen_cb this function will be called when a peer tries to establish
3259  *            a stream with us
3260  * @param listen_cb_cls closure for listen_cb
3261  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3262  * @return listen socket, NULL for any error
3263  */
3264 struct GNUNET_STREAM_ListenSocket *
3265 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3266                       GNUNET_MESH_ApplicationType app_port,
3267                       GNUNET_STREAM_ListenCallback listen_cb,
3268                       void *listen_cb_cls,
3269                       ...)
3270 {
3271   /* FIXME: Add variable args for passing configration options? */
3272   struct GNUNET_STREAM_ListenSocket *lsocket;
3273   struct GNUNET_TIME_Relative listen_timeout;
3274   enum GNUNET_STREAM_Option option;
3275   va_list vargs;
3276
3277   GNUNET_assert (NULL != listen_cb);
3278   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3279   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3280   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3281   if (NULL == lsocket->lockmanager)
3282   {
3283     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3284     GNUNET_free (lsocket);
3285     return NULL;
3286   }
3287   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3288   /* Set defaults */
3289   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3290   lsocket->testing_active = GNUNET_NO;
3291   lsocket->listen_ok_cb = NULL;
3292   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3293   va_start (vargs, listen_cb_cls);
3294   do {
3295     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3296     switch (option)
3297     {
3298     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3299       lsocket->retransmit_timeout = va_arg (vargs,
3300                                             struct GNUNET_TIME_Relative);
3301       break;
3302     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3303       lsocket->testing_active = GNUNET_YES;
3304       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3305                                                                  uint32_t);
3306       break;
3307     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3308       listen_timeout = GNUNET_TIME_relative_multiply
3309         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3310       break;
3311     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3312       lsocket->listen_ok_cb = va_arg (vargs,
3313                                       GNUNET_STREAM_ListenSuccessCallback);
3314       break;
3315     case GNUNET_STREAM_OPTION_END:
3316       break;
3317     }
3318   } while (GNUNET_STREAM_OPTION_END != option);
3319   va_end (vargs);
3320   lsocket->port = app_port;
3321   lsocket->listen_cb = listen_cb;
3322   lsocket->listen_cb_cls = listen_cb_cls;
3323   lsocket->locking_request = 
3324     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3325                                      (uint32_t) lsocket->port,
3326                                      &lock_status_change_cb, lsocket);
3327   lsocket->lockmanager_acquire_timeout_task =
3328     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3329                                   &lockmanager_acquire_timeout, lsocket);
3330   return lsocket;
3331 }
3332
3333
3334 /**
3335  * Closes the listen socket
3336  *
3337  * @param lsocket the listen socket
3338  */
3339 void
3340 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3341 {
3342   /* Close MESH connection */
3343   if (NULL != lsocket->mesh)
3344     GNUNET_MESH_disconnect (lsocket->mesh);
3345   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3346   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3347     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3348   if (NULL != lsocket->locking_request)
3349     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3350   if (NULL != lsocket->lockmanager)
3351     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3352   GNUNET_free (lsocket);
3353 }
3354
3355
3356 /**
3357  * Tries to write the given data to the stream. The maximum size of data that
3358  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3359  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3360  * violation, however only the said number of maximum bytes will be written.
3361  *
3362  * @param socket the socket representing a stream
3363  * @param data the data buffer from where the data is written into the stream
3364  * @param size the number of bytes to be written from the data buffer
3365  * @param timeout the timeout period
3366  * @param write_cont the function to call upon writing some bytes into the
3367  *          stream 
3368  * @param write_cont_cls the closure
3369  *
3370  * @return handle to cancel the operation; if a previous write is pending or
3371  *           the stream has been shutdown for this operation then write_cont is
3372  *           immediately called and NULL is returned.
3373  */
3374 struct GNUNET_STREAM_IOWriteHandle *
3375 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3376                      const void *data,
3377                      size_t size,
3378                      struct GNUNET_TIME_Relative timeout,
3379                      GNUNET_STREAM_CompletionContinuation write_cont,
3380                      void *write_cont_cls)
3381 {
3382   unsigned int num_needed_packets;
3383   unsigned int packet;
3384   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3385   uint32_t packet_size;
3386   uint32_t payload_size;
3387   struct GNUNET_STREAM_DataMessage *data_msg;
3388   const void *sweep;
3389   struct GNUNET_TIME_Relative ack_deadline;
3390
3391   LOG (GNUNET_ERROR_TYPE_DEBUG,
3392        "%s\n", __func__);
3393
3394   /* Return NULL if there is already a write request pending */
3395   if (NULL != socket->write_handle)
3396   {
3397     GNUNET_break (0);
3398     return NULL;
3399   }
3400
3401   switch (socket->state)
3402   {
3403   case STATE_TRANSMIT_CLOSED:
3404   case STATE_TRANSMIT_CLOSE_WAIT:
3405   case STATE_CLOSED:
3406   case STATE_CLOSE_WAIT:
3407     if (NULL != write_cont)
3408       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3409     LOG (GNUNET_ERROR_TYPE_DEBUG,
3410          "%s() END\n", __func__);
3411     return NULL;
3412   case STATE_INIT:
3413   case STATE_LISTEN:
3414   case STATE_HELLO_WAIT:
3415     if (NULL != write_cont)
3416       /* FIXME: GNUNET_STREAM_SYSERR?? */
3417       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3418     LOG (GNUNET_ERROR_TYPE_DEBUG,
3419          "%s() END\n", __func__);
3420     return NULL;
3421   case STATE_ESTABLISHED:
3422   case STATE_RECEIVE_CLOSED:
3423   case STATE_RECEIVE_CLOSE_WAIT:
3424     break;
3425   }
3426
3427   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3428     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3429   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3430   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3431   io_handle->socket = socket;
3432   io_handle->write_cont = write_cont;
3433   io_handle->write_cont_cls = write_cont_cls;
3434   io_handle->size = size;
3435   sweep = data;
3436   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3437      determined from RTT */
3438   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3439   /* Divide the given buffer into packets for sending */
3440   for (packet=0; packet < num_needed_packets; packet++)
3441   {
3442     if ((packet + 1) * max_payload_size < size) 
3443     {
3444       payload_size = max_payload_size;
3445       packet_size = MAX_PACKET_SIZE;
3446     }
3447     else 
3448     {
3449       payload_size = size - packet * max_payload_size;
3450       packet_size =  payload_size + sizeof (struct
3451                                             GNUNET_STREAM_DataMessage); 
3452     }
3453     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3454     io_handle->messages[packet]->header.header.size = htons (packet_size);
3455     io_handle->messages[packet]->header.header.type =
3456       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3457     io_handle->messages[packet]->sequence_number =
3458       htonl (socket->write_sequence_number++);
3459     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3460
3461     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3462        determined from RTT */
3463     io_handle->messages[packet]->ack_deadline =
3464       GNUNET_TIME_relative_hton (ack_deadline);
3465     data_msg = io_handle->messages[packet];
3466     /* Copy data from given buffer to the packet */
3467     memcpy (&data_msg[1],
3468             sweep,
3469             payload_size);
3470     sweep += payload_size;
3471     socket->write_offset += payload_size;
3472   }
3473   socket->write_handle = io_handle;
3474   write_data (socket);
3475
3476   LOG (GNUNET_ERROR_TYPE_DEBUG,
3477        "%s() END\n", __func__);
3478
3479   return io_handle;
3480 }
3481
3482
3483 /**
3484  * Tries to read data from the stream.
3485  *
3486  * @param socket the socket representing a stream
3487  * @param timeout the timeout period
3488  * @param proc function to call with data (once only)
3489  * @param proc_cls the closure for proc
3490  *
3491  * @return handle to cancel the operation; if the stream has been shutdown for
3492  *           this type of opeartion then the DataProcessor is immediately
3493  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3494  */
3495 struct GNUNET_STREAM_IOReadHandle *
3496 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3497                     struct GNUNET_TIME_Relative timeout,
3498                     GNUNET_STREAM_DataProcessor proc,
3499                     void *proc_cls)
3500 {
3501   struct GNUNET_STREAM_IOReadHandle *read_handle;
3502   
3503   LOG (GNUNET_ERROR_TYPE_DEBUG,
3504        "%s: %s()\n", 
3505        GNUNET_i2s (&socket->other_peer),
3506        __func__);
3507   /* Return NULL if there is already a read handle; the user has to cancel that
3508      first before continuing or has to wait until it is completed */
3509   if (NULL != socket->read_handle) 
3510     return NULL;
3511   GNUNET_assert (NULL != proc);
3512   switch (socket->state)
3513   {
3514   case STATE_RECEIVE_CLOSED:
3515   case STATE_RECEIVE_CLOSE_WAIT:
3516   case STATE_CLOSED:
3517   case STATE_CLOSE_WAIT:
3518     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3519     LOG (GNUNET_ERROR_TYPE_DEBUG,
3520          "%s: %s() END\n",
3521          GNUNET_i2s (&socket->other_peer),
3522          __func__);
3523     return NULL;
3524   default:
3525     break;
3526   }
3527   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3528   read_handle->proc = proc;
3529   read_handle->proc_cls = proc_cls;
3530   read_handle->socket = socket;
3531   socket->read_handle = read_handle;
3532   /* Check if we have a packet at bitmap 0 */
3533   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3534                                           0))
3535   {
3536     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3537                                                      socket);   
3538   }
3539   /* Setup the read timeout task */
3540   socket->read_io_timeout_task_id =
3541     GNUNET_SCHEDULER_add_delayed (timeout,
3542                                   &read_io_timeout,
3543                                   socket);
3544   LOG (GNUNET_ERROR_TYPE_DEBUG,
3545        "%s: %s() END\n",
3546        GNUNET_i2s (&socket->other_peer),
3547        __func__);
3548   return read_handle;
3549 }
3550
3551
3552 /**
3553  * Cancel pending write operation.
3554  *
3555  * @param ioh handle to operation to cancel
3556  */
3557 void
3558 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3559 {
3560   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3561   unsigned int packet;
3562
3563   GNUNET_assert (NULL != socket->write_handle);
3564   GNUNET_assert (socket->write_handle == ioh);
3565
3566   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3567   {
3568     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3569     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3570   }
3571
3572   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3573   {
3574     if (NULL == ioh->messages[packet]) break;
3575     GNUNET_free (ioh->messages[packet]);
3576   }
3577       
3578   GNUNET_free (socket->write_handle);
3579   socket->write_handle = NULL;
3580 }
3581
3582
3583 /**
3584  * Cancel pending read operation.
3585  *
3586  * @param ioh handle to operation to cancel
3587  */
3588 void
3589 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3590 {
3591   struct GNUNET_STREAM_Socket *socket;
3592   
3593   socket = ioh->socket;
3594   GNUNET_assert (NULL != socket->read_handle);
3595   GNUNET_assert (ioh == socket->read_handle);
3596   /* Read io time task should be there; if it is already executed then this
3597   read handle is not valid */
3598   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
3599   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3600   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3601   /* reading task may be present; if so we have to stop it */
3602   if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
3603   {
3604     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3605     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3606   }
3607   GNUNET_free (ioh);
3608   socket->read_handle = NULL;
3609 }
3610
3611 /* end of stream_api.c */