3abea2a49f41afb0e1c7234ad4ff287a130d718a
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * Retransmission timeout
179    */
180   struct GNUNET_TIME_Relative retransmit_timeout;
181
182   /**
183    * The Acknowledgement Bitmap
184    */
185   GNUNET_STREAM_AckBitmap ack_bitmap;
186
187   /**
188    * Time when the Acknowledgement was queued
189    */
190   struct GNUNET_TIME_Absolute ack_time_registered;
191
192   /**
193    * Queued Acknowledgement deadline
194    */
195   struct GNUNET_TIME_Relative ack_time_deadline;
196
197   /**
198    * The mesh handle
199    */
200   struct GNUNET_MESH_Handle *mesh;
201
202   /**
203    * The mesh tunnel handle
204    */
205   struct GNUNET_MESH_Tunnel *tunnel;
206
207   /**
208    * Stream open closure
209    */
210   void *open_cls;
211
212   /**
213    * Stream open callback
214    */
215   GNUNET_STREAM_OpenCallback open_cb;
216
217   /**
218    * The current transmit handle (if a pending transmit request exists)
219    */
220   struct GNUNET_MESH_TransmitHandle *transmit_handle;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for retransmission task after timeout
265    */
266   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
267
268   /**
269    * Task identifier for retransmission of control messages
270    */
271   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * The state of the protocol associated with this socket
280    */
281   enum State state;
282
283   /**
284    * The status of the socket
285    */
286   enum GNUNET_STREAM_Status status;
287
288   /**
289    * The number of previous timeouts; FIXME: currently not used
290    */
291   unsigned int retries;
292
293   /**
294    * The application port number (type: uint32_t)
295    */
296   GNUNET_MESH_ApplicationType app_port;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * The write sequence number to be set incase of testing
305    */
306   uint32_t testing_set_write_sequence_number_value;
307
308   /**
309    * Write sequence number. Set to random when sending HELLO(client) and
310    * HELLO_ACK(server) 
311    */
312   uint32_t write_sequence_number;
313
314   /**
315    * Read sequence number. This number's value is determined during handshake
316    */
317   uint32_t read_sequence_number;
318
319   /**
320    * The receiver buffer size
321    */
322   uint32_t receive_buffer_size;
323
324   /**
325    * The receiver buffer boundaries
326    */
327   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
328
329   /**
330    * receiver's available buffer after the last acknowledged packet
331    */
332   uint32_t receiver_window_available;
333
334   /**
335    * The offset pointer used during write operation
336    */
337   uint32_t write_offset;
338
339   /**
340    * The offset after which we are expecting data
341    */
342   uint32_t read_offset;
343
344   /**
345    * The offset upto which user has read from the received buffer
346    */
347   uint32_t copy_offset;
348
349   /**
350    * The maximum size of the data message payload this stream handle can send
351    */
352   uint16_t max_payload_size;
353 };
354
355
356 /**
357  * A socket for listening
358  */
359 struct GNUNET_STREAM_ListenSocket
360 {
361   /**
362    * The mesh handle
363    */
364   struct GNUNET_MESH_Handle *mesh;
365
366   /**
367    * Our configuration
368    */
369   struct GNUNET_CONFIGURATION_Handle *cfg;
370
371   /**
372    * Handle to the lock manager service
373    */
374   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
375
376   /**
377    * The active LockingRequest from lockmanager
378    */
379   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
380
381   /**
382    * Callback to call after acquring a lock and listening
383    */
384   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
385
386   /**
387    * The callback function which is called after successful opening socket
388    */
389   GNUNET_STREAM_ListenCallback listen_cb;
390
391   /**
392    * The call back closure
393    */
394   void *listen_cb_cls;
395
396   /**
397    * The service port
398    */
399   GNUNET_MESH_ApplicationType port;
400   
401   /**
402    * The id of the lockmanager timeout task
403    */
404   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
405
406   /**
407    * The retransmit timeout
408    */
409   struct GNUNET_TIME_Relative retransmit_timeout;
410   
411   /**
412    * Listen enabled?
413    */
414   int listening;
415
416   /**
417    * Whether testing mode is active or not
418    */
419   int testing_active;
420
421   /**
422    * The write sequence number to be set incase of testing
423    */
424   uint32_t testing_set_write_sequence_number_value;
425
426   /**
427    * The maximum size of the data message payload this stream handle can send
428    */
429   uint16_t max_payload_size;
430
431 };
432
433
434 /**
435  * The IO Write Handle
436  */
437 struct GNUNET_STREAM_IOWriteHandle
438 {
439   /**
440    * The socket to which this write handle is associated
441    */
442   struct GNUNET_STREAM_Socket *socket;
443
444   /**
445    * The packet_buffers associated with this Handle
446    */
447   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
448
449   /**
450    * The write continuation callback
451    */
452   GNUNET_STREAM_CompletionContinuation write_cont;
453
454   /**
455    * Write continuation closure
456    */
457   void *write_cont_cls;
458
459   /**
460    * The bitmap of this IOHandle; Corresponding bit for a message is set when
461    * it has been acknowledged by the receiver
462    */
463   GNUNET_STREAM_AckBitmap ack_bitmap;
464
465   /**
466    * Number of bytes in this write handle
467    */
468   size_t size;
469
470   /**
471    * Number of packets already transmitted from this IO handle. Retransmitted
472    * packets are not taken into account here. This is used to determine which
473    * packets account for retransmission and which packets occupy buffer space at
474    * the receiver.
475    */
476   unsigned int packets_sent;
477 };
478
479
480 /**
481  * The IO Read Handle
482  */
483 struct GNUNET_STREAM_IOReadHandle
484 {
485   /**
486    * The socket to which this read handle is associated
487    */
488   struct GNUNET_STREAM_Socket *socket;
489   
490   /**
491    * Callback for the read processor
492    */
493   GNUNET_STREAM_DataProcessor proc;
494
495   /**
496    * The closure pointer for the read processor callback
497    */
498   void *proc_cls;
499
500   /**
501    * Task identifier for the read io timeout task
502    */
503   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
504
505   /**
506    * Task scheduled to continue a read operation.
507    */
508   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
509 };
510
511
512 /**
513  * Handle for Shutdown
514  */
515 struct GNUNET_STREAM_ShutdownHandle
516 {
517   /**
518    * The socket associated with this shutdown handle
519    */
520   struct GNUNET_STREAM_Socket *socket;
521
522   /**
523    * Shutdown completion callback
524    */
525   GNUNET_STREAM_ShutdownCompletion completion_cb;
526
527   /**
528    * Closure for completion callback
529    */
530   void *completion_cls;
531
532   /**
533    * Close message retransmission task id
534    */
535   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
536
537   /**
538    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
539    */
540   int operation;  
541 };
542
543
544 /**
545  * Default value in seconds for various timeouts
546  */
547 static const unsigned int default_timeout = 10;
548
549 /**
550  * The domain name for locks we use here
551  */
552 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
553
554
555 /**
556  * Callback function for sending queued message
557  *
558  * @param cls closure the socket
559  * @param size number of bytes available in buf
560  * @param buf where the callee should write the message
561  * @return number of bytes written to buf
562  */
563 static size_t
564 send_message_notify (void *cls, size_t size, void *buf)
565 {
566   struct GNUNET_STREAM_Socket *socket = cls;
567   struct MessageQueue *head;
568   size_t ret;
569
570   socket->transmit_handle = NULL; /* Remove the transmit handle */
571   head = socket->queue_head;
572   if (NULL == head)
573     return 0; /* just to be safe */
574   if (0 == size)                /* request timed out */
575   {
576     socket->retries++;
577     LOG (GNUNET_ERROR_TYPE_DEBUG,
578          "%s: Message sending timed out. Retry %d \n",
579          GNUNET_i2s (&socket->other_peer),
580          socket->retries);
581     socket->transmit_handle = 
582       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
583                                          GNUNET_NO, /* Corking */
584                                          /* FIXME: exponential backoff */
585                                          socket->retransmit_timeout,
586                                          &socket->other_peer,
587                                          ntohs (head->message->header.size),
588                                          &send_message_notify,
589                                          socket);
590     return 0;
591   }
592   ret = ntohs (head->message->header.size);
593   GNUNET_assert (size >= ret);
594   memcpy (buf, head->message, ret);
595   if (NULL != head->finish_cb)
596   {
597     head->finish_cb (head->finish_cb_cls, socket);
598   }
599   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
600                                socket->queue_tail,
601                                head);
602   GNUNET_free (head->message);
603   GNUNET_free (head);
604   head = socket->queue_head;
605   if (NULL != head)    /* more pending messages to send */
606   {
607     socket->retries = 0;
608     socket->transmit_handle = 
609       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
610                                          GNUNET_NO, /* Corking */
611                                          /* FIXME: exponential backoff */
612                                          socket->retransmit_timeout,
613                                          &socket->other_peer,
614                                          ntohs (head->message->header.size),
615                                          &send_message_notify,
616                                          socket);
617   }
618   return ret;
619 }
620
621
622 /**
623  * Queues a message for sending using the mesh connection of a socket
624  *
625  * @param socket the socket whose mesh connection is used
626  * @param message the message to be sent
627  * @param finish_cb the callback to be called when the message is sent
628  * @param finish_cb_cls the closure for the callback
629  * @param urgent set to GNUNET_YES to add the message to the beginning of the
630  *          queue; GNUNET_NO to add at the tail
631  */
632 static void
633 queue_message (struct GNUNET_STREAM_Socket *socket,
634                struct GNUNET_STREAM_MessageHeader *message,
635                SendFinishCallback finish_cb,
636                void *finish_cb_cls,
637                int urgent)
638 {
639   struct MessageQueue *queue_entity;
640
641   GNUNET_assert 
642     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
643      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
644   LOG (GNUNET_ERROR_TYPE_DEBUG,
645        "%s: Queueing message of type %d and size %d\n",
646        GNUNET_i2s (&socket->other_peer),
647        ntohs (message->header.type),
648        ntohs (message->header.size));
649   GNUNET_assert (NULL != message);
650   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
651   queue_entity->message = message;
652   queue_entity->finish_cb = finish_cb;
653   queue_entity->finish_cb_cls = finish_cb_cls;
654   if (GNUNET_YES == urgent)
655   {
656     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
657                                  queue_entity);
658     if (NULL != socket->transmit_handle)
659     {
660       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
661       socket->transmit_handle = NULL;
662     }
663   }
664   else
665     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
666                                       socket->queue_tail,
667                                       queue_entity);
668   if (NULL == socket->transmit_handle)
669   {
670     socket->retries = 0;
671     socket->transmit_handle = 
672         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
673                                            GNUNET_NO, /* Corking */
674                                            socket->retransmit_timeout,
675                                            &socket->other_peer,
676                                            ntohs (message->header.size),
677                                            &send_message_notify,
678                                            socket);
679   }
680 }
681
682
683 /**
684  * Copies a message and queues it for sending using the mesh connection of
685  * given socket 
686  *
687  * @param socket the socket whose mesh connection is used
688  * @param message the message to be sent
689  * @param finish_cb the callback to be called when the message is sent
690  * @param finish_cb_cls the closure for the callback
691  */
692 static void
693 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
694                         const struct GNUNET_STREAM_MessageHeader *message,
695                         SendFinishCallback finish_cb,
696                         void *finish_cb_cls)
697 {
698   struct GNUNET_STREAM_MessageHeader *msg_copy;
699   uint16_t size;
700   
701   size = ntohs (message->header.size);
702   msg_copy = GNUNET_malloc (size);
703   memcpy (msg_copy, message, size);
704   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
705 }
706
707
708 /**
709  * Writes data using the given socket. The amount of data written is limited by
710  * the receiver_window_size
711  *
712  * @param socket the socket to use
713  */
714 static void 
715 write_data (struct GNUNET_STREAM_Socket *socket);
716
717
718 /**
719  * Task for retransmitting data messages if they aren't ACK before their ack
720  * deadline 
721  *
722  * @param cls the socket
723  * @param tc the Task context
724  */
725 static void
726 data_retransmission_task (void *cls,
727                           const struct GNUNET_SCHEDULER_TaskContext *tc)
728 {
729   struct GNUNET_STREAM_Socket *socket = cls;
730   
731   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
732   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
733     return;
734   LOG (GNUNET_ERROR_TYPE_DEBUG,
735        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
736   write_data (socket);
737 }
738
739
740 /**
741  * Task for sending ACK message
742  *
743  * @param cls the socket
744  * @param tc the Task context
745  */
746 static void
747 ack_task (void *cls,
748           const struct GNUNET_SCHEDULER_TaskContext *tc)
749 {
750   struct GNUNET_STREAM_Socket *socket = cls;
751   struct GNUNET_STREAM_AckMessage *ack_msg;
752
753   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
754   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
755     return;
756   /* Create the ACK Message */
757   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
758   ack_msg->header.header.size = htons (sizeof (struct 
759                                                GNUNET_STREAM_AckMessage));
760   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
761   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
762   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
763   ack_msg->receive_window_remaining = 
764     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
765   /* Queue up ACK for immediate sending */
766   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
767 }
768
769
770 /**
771  * Retransmission task for shutdown messages
772  *
773  * @param cls the shutdown handle
774  * @param tc the Task Context
775  */
776 static void
777 close_msg_retransmission_task (void *cls,
778                                const struct GNUNET_SCHEDULER_TaskContext *tc)
779 {
780   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
781   struct GNUNET_STREAM_MessageHeader *msg;
782   struct GNUNET_STREAM_Socket *socket;
783
784   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
785   GNUNET_assert (NULL != shutdown_handle);
786   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
787     return;
788   socket = shutdown_handle->socket;
789   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
790   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
791   switch (shutdown_handle->operation)
792   {
793   case SHUT_RDWR:
794     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
795     break;
796   case SHUT_RD:
797     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
798     break;
799   case SHUT_WR:
800     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
801     break;
802   default:
803     GNUNET_free (msg);
804     shutdown_handle->close_msg_retransmission_task_id = 
805       GNUNET_SCHEDULER_NO_TASK;
806     return;
807   }
808   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
809   shutdown_handle->close_msg_retransmission_task_id =
810     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
811                                   &close_msg_retransmission_task,
812                                   shutdown_handle);
813 }
814
815
816 /**
817  * Function to modify a bit in GNUNET_STREAM_AckBitmap
818  *
819  * @param bitmap the bitmap to modify
820  * @param bit the bit number to modify
821  * @param value GNUNET_YES to on, GNUNET_NO to off
822  */
823 static void
824 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
825                       unsigned int bit, 
826                       int value)
827 {
828   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
829   if (GNUNET_YES == value)
830     *bitmap |= (1LL << bit);
831   else
832     *bitmap &= ~(1LL << bit);
833 }
834
835
836 /**
837  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
838  *
839  * @param bitmap address of the bitmap that has to be checked
840  * @param bit the bit number to check
841  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
842  */
843 static uint8_t
844 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
845                       unsigned int bit)
846 {
847   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
848   return 0 != (*bitmap & (1LL << bit));
849 }
850
851
852 /**
853  * Writes data using the given socket. The amount of data written is limited by
854  * the receiver_window_size
855  *
856  * @param socket the socket to use
857  */
858 static void 
859 write_data (struct GNUNET_STREAM_Socket *socket)
860 {
861   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
862   unsigned int packet;
863   
864   for (packet=0; packet < io_handle->packets_sent; packet++)
865   {
866     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
867                                            packet))
868     {
869       LOG (GNUNET_ERROR_TYPE_DEBUG,
870            "%s: Retransmitting DATA message with sequence %u\n",
871            GNUNET_i2s (&socket->other_peer),
872            ntohl (io_handle->messages[packet]->sequence_number));
873       copy_and_queue_message (socket,
874                               &io_handle->messages[packet]->header,
875                               NULL,
876                               NULL);
877     }
878   }
879   /* Now send new packets if there is enough buffer space */
880   while ( (NULL != io_handle->messages[packet]) &&
881           (socket->receiver_window_available 
882            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
883           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
884   {
885     socket->receiver_window_available -= 
886       ntohs (io_handle->messages[packet]->header.header.size);
887     LOG (GNUNET_ERROR_TYPE_DEBUG,
888          "%s: Placing DATA message with sequence %u in send queue\n",
889          GNUNET_i2s (&socket->other_peer),
890          ntohl (io_handle->messages[packet]->sequence_number));
891     copy_and_queue_message (socket,
892                             &io_handle->messages[packet]->header,
893                             NULL,
894                             NULL);
895     packet++;
896   }
897   io_handle->packets_sent = packet;
898   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
899   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
900     socket->data_retransmission_task_id = 
901       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
902                                     (GNUNET_TIME_UNIT_SECONDS, 8),
903                                     &data_retransmission_task,
904                                     socket);
905 }
906
907
908 /**
909  * Task for calling the read processor
910  *
911  * @param cls the socket
912  * @param tc the task context
913  */
914 static void
915 call_read_processor (void *cls,
916                      const struct GNUNET_SCHEDULER_TaskContext *tc)
917 {
918   struct GNUNET_STREAM_Socket *socket = cls;
919   struct GNUNET_STREAM_IOReadHandle *read_handle;
920   size_t read_size;
921   size_t valid_read_size;
922   unsigned int packet;
923   uint32_t sequence_increase;
924   uint32_t offset_increase;
925
926   read_handle = socket->read_handle;
927   GNUNET_assert (NULL != read_handle);
928   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
929   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
930     return;
931   if (NULL == socket->receive_buffer) 
932     return;
933   GNUNET_assert (NULL != socket->read_handle);
934   GNUNET_assert (NULL != socket->read_handle->proc);
935   /* Check the bitmap for any holes */
936   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
937   {
938     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
939                                            packet))
940       break;
941   }
942   /* We only call read processor if we have the first packet */
943   GNUNET_assert (0 < packet);
944   valid_read_size = 
945     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
946   GNUNET_assert (0 != valid_read_size);
947   /* Cancel the read_io_timeout_task */
948   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
949   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
950   /* Call the data processor */
951   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
952        GNUNET_i2s (&socket->other_peer));
953   read_size = 
954       socket->read_handle->proc (socket->read_handle->proc_cls,
955                                  socket->status,
956                                  socket->receive_buffer + socket->copy_offset,
957                                  valid_read_size);
958   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
959        GNUNET_i2s (&socket->other_peer), read_size);
960   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
961        GNUNET_i2s (&socket->other_peer));
962   /* Free the read handle */
963   GNUNET_free (socket->read_handle);
964   socket->read_handle = NULL;
965   GNUNET_assert (read_size <= valid_read_size);
966   socket->copy_offset += read_size;
967   /* Determine upto which packet we can remove from the buffer */
968   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
969   {
970     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
971     { packet++; break; }
972     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
973       break;
974   }
975   /* If no packets can be removed we can't move the buffer */
976   if (0 == packet) 
977     return;
978   sequence_increase = packet;
979   LOG (GNUNET_ERROR_TYPE_DEBUG,
980        "%s: Sequence increase after read processor completion: %u\n",
981        GNUNET_i2s (&socket->other_peer), sequence_increase);
982   /* Shift the data in the receive buffer */
983   socket->receive_buffer = 
984     memmove (socket->receive_buffer,
985              socket->receive_buffer 
986              + socket->receive_buffer_boundaries[sequence_increase-1],
987              socket->receive_buffer_size
988              - socket->receive_buffer_boundaries[sequence_increase-1]);
989   /* Shift the bitmap */
990   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
991   /* Set read_sequence_number */
992   socket->read_sequence_number += sequence_increase;
993   /* Set read_offset */
994   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
995   socket->read_offset += offset_increase;
996   /* Fix copy_offset */
997   GNUNET_assert (offset_increase <= socket->copy_offset);
998   socket->copy_offset -= offset_increase;
999   /* Fix relative boundaries */
1000   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1001   {
1002     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1003     {
1004       uint32_t ahead_buffer_boundary;
1005
1006       ahead_buffer_boundary = 
1007         socket->receive_buffer_boundaries[packet + sequence_increase];
1008       if (0 == ahead_buffer_boundary)
1009         socket->receive_buffer_boundaries[packet] = 0;
1010       else
1011       {
1012         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1013         socket->receive_buffer_boundaries[packet] = 
1014           ahead_buffer_boundary - offset_increase;
1015       }
1016     }
1017     else
1018       socket->receive_buffer_boundaries[packet] = 0;
1019   }
1020 }
1021
1022
1023 /**
1024  * Cancels the existing read io handle
1025  *
1026  * @param cls the closure from the SCHEDULER call
1027  * @param tc the task context
1028  */
1029 static void
1030 read_io_timeout (void *cls,
1031                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1032 {
1033   struct GNUNET_STREAM_Socket *socket = cls;
1034   struct GNUNET_STREAM_IOReadHandle *read_handle;
1035   GNUNET_STREAM_DataProcessor proc;
1036   void *proc_cls;
1037
1038   read_handle = socket->read_handle;
1039   GNUNET_assert (NULL != read_handle);
1040   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1041   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1042     return;
1043   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1044   {
1045     LOG (GNUNET_ERROR_TYPE_DEBUG,
1046          "%s: Read task timedout - Cancelling it\n",
1047          GNUNET_i2s (&socket->other_peer));
1048     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1049     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1050   }
1051   proc = read_handle->proc;
1052   proc_cls = read_handle->proc_cls;
1053   GNUNET_free (read_handle);
1054   socket->read_handle = NULL;
1055   /* Call the read processor to signal timeout */
1056   proc (proc_cls,
1057         GNUNET_STREAM_TIMEOUT,
1058         NULL,
1059         0);
1060 }
1061
1062
1063 /**
1064  * Handler for DATA messages; Same for both client and server
1065  *
1066  * @param socket the socket through which the ack was received
1067  * @param tunnel connection to the other end
1068  * @param sender who sent the message
1069  * @param msg the data message
1070  * @param atsi performance data for the connection
1071  * @return GNUNET_OK to keep the connection open,
1072  *         GNUNET_SYSERR to close it (signal serious error)
1073  */
1074 static int
1075 handle_data (struct GNUNET_STREAM_Socket *socket,
1076              struct GNUNET_MESH_Tunnel *tunnel,
1077              const struct GNUNET_PeerIdentity *sender,
1078              const struct GNUNET_STREAM_DataMessage *msg,
1079              const struct GNUNET_ATS_Information*atsi)
1080 {
1081   const void *payload;
1082   struct GNUNET_TIME_Relative ack_deadline_rel;
1083   uint32_t bytes_needed;
1084   uint32_t relative_offset;
1085   uint32_t relative_sequence_number;
1086   uint16_t size;
1087
1088   size = htons (msg->header.header.size);
1089   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1090   {
1091     GNUNET_break_op (0);
1092     return GNUNET_SYSERR;
1093   }
1094   if (0 != memcmp (sender, &socket->other_peer,
1095                    sizeof (struct GNUNET_PeerIdentity)))
1096   {
1097     LOG (GNUNET_ERROR_TYPE_DEBUG,
1098          "%s: Received DATA from non-confirming peer\n",
1099          GNUNET_i2s (&socket->other_peer));
1100     return GNUNET_YES;
1101   }
1102   switch (socket->state)
1103   {
1104   case STATE_ESTABLISHED:
1105   case STATE_TRANSMIT_CLOSED:
1106   case STATE_TRANSMIT_CLOSE_WAIT:      
1107     /* check if the message's sequence number is in the range we are
1108        expecting */
1109     relative_sequence_number = 
1110       ntohl (msg->sequence_number) - socket->read_sequence_number;
1111     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1112     {
1113       LOG (GNUNET_ERROR_TYPE_DEBUG,
1114            "%s: Ignoring received message with sequence number %u\n",
1115            GNUNET_i2s (&socket->other_peer),
1116            ntohl (msg->sequence_number));
1117       /* Start ACK sending task if one is not already present */
1118       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1119       {
1120         socket->ack_task_id = 
1121           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1122                                         (msg->ack_deadline),
1123                                         &ack_task,
1124                                         socket);
1125       }
1126       return GNUNET_YES;
1127     }      
1128     /* Check if we have already seen this message */
1129     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1130                                             relative_sequence_number))
1131     {
1132       LOG (GNUNET_ERROR_TYPE_DEBUG,
1133            "%s: Ignoring already received message with sequence number %u\n",
1134            GNUNET_i2s (&socket->other_peer),
1135            ntohl (msg->sequence_number));
1136       /* Start ACK sending task if one is not already present */
1137       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1138       {
1139         socket->ack_task_id = 
1140           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1141                                         (msg->ack_deadline), &ack_task, socket);
1142       }
1143       return GNUNET_YES;
1144     }
1145     LOG (GNUNET_ERROR_TYPE_DEBUG,
1146          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1147          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1148          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1149     /* Check if we have to allocate the buffer */
1150     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1151     relative_offset = ntohl (msg->offset) - socket->read_offset;
1152     bytes_needed = relative_offset + size;
1153     if (bytes_needed > socket->receive_buffer_size)
1154     {
1155       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1156       {
1157         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1158                                                  bytes_needed);
1159         socket->receive_buffer_size = bytes_needed;
1160       }
1161       else
1162       {
1163         LOG (GNUNET_ERROR_TYPE_DEBUG,
1164              "%s: Cannot accommodate packet %d as buffer is full\n",
1165              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1166         return GNUNET_YES;
1167       }
1168     }
1169     /* Copy Data to buffer */
1170     payload = &msg[1];
1171     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1172     memcpy (socket->receive_buffer + relative_offset, payload, size);
1173     socket->receive_buffer_boundaries[relative_sequence_number] = 
1174         relative_offset + size;
1175     /* Modify the ACK bitmap */
1176     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1177                           GNUNET_YES);
1178     /* Start ACK sending task if one is not already present */
1179     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1180     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1181     {
1182       ack_deadline_rel = 
1183           GNUNET_TIME_relative_min (ack_deadline_rel,
1184                                     GNUNET_TIME_relative_multiply
1185                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1186       socket->ack_task_id = 
1187           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1188                                         (msg->ack_deadline), &ack_task, socket);
1189       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1190       socket->ack_time_deadline = ack_deadline_rel;
1191     }
1192     else
1193     {
1194       struct GNUNET_TIME_Relative ack_time_past;
1195       struct GNUNET_TIME_Relative ack_time_remaining;
1196       struct GNUNET_TIME_Relative ack_time_min;
1197       ack_time_past = 
1198           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1199       ack_time_remaining = GNUNET_TIME_relative_subtract
1200           (socket->ack_time_deadline, ack_time_past);
1201       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1202                                                ack_deadline_rel);
1203       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1204                       sizeof (struct GNUNET_TIME_Relative)))
1205       {
1206         ack_deadline_rel = ack_time_min;
1207         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1208         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1209                                                             &ack_task, socket);
1210         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1211         socket->ack_time_deadline = ack_deadline_rel;
1212       }
1213     }
1214     if ((NULL != socket->read_handle) /* A read handle is waiting */
1215         /* There is no current read task */
1216         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1217         /* We have the first packet */
1218         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1219     {
1220       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1221            GNUNET_i2s (&socket->other_peer));
1222       socket->read_handle->read_task_id =
1223           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1224     }
1225     break;
1226   default:
1227     LOG (GNUNET_ERROR_TYPE_DEBUG,
1228          "%s: Received data message when it cannot be handled\n",
1229          GNUNET_i2s (&socket->other_peer));
1230     break;
1231   }
1232   return GNUNET_YES;
1233 }
1234
1235
1236 /**
1237  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1238  *
1239  * @param cls the socket (set from GNUNET_MESH_connect)
1240  * @param tunnel connection to the other end
1241  * @param tunnel_ctx place to store local state associated with the tunnel
1242  * @param sender who sent the message
1243  * @param message the actual message
1244  * @param atsi performance data for the connection
1245  * @return GNUNET_OK to keep the connection open,
1246  *         GNUNET_SYSERR to close it (signal serious error)
1247  */
1248 static int
1249 client_handle_data (void *cls,
1250                     struct GNUNET_MESH_Tunnel *tunnel,
1251                     void **tunnel_ctx,
1252                     const struct GNUNET_PeerIdentity *sender,
1253                     const struct GNUNET_MessageHeader *message,
1254                     const struct GNUNET_ATS_Information*atsi)
1255 {
1256   struct GNUNET_STREAM_Socket *socket = cls;
1257
1258   return handle_data (socket, tunnel, sender, 
1259                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1260 }
1261
1262
1263 /**
1264  * Callback to set state to ESTABLISHED
1265  *
1266  * @param cls the closure NULL;
1267  * @param socket the socket to requiring state change
1268  */
1269 static void
1270 set_state_established (void *cls,
1271                        struct GNUNET_STREAM_Socket *socket)
1272 {
1273   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1274        "%s: Attaining ESTABLISHED state\n",
1275        GNUNET_i2s (&socket->other_peer));
1276   socket->write_offset = 0;
1277   socket->read_offset = 0;
1278   socket->state = STATE_ESTABLISHED;
1279   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1280                  socket->control_retransmission_task_id);
1281   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1282   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1283   if (NULL != socket->lsocket)
1284   {
1285     LOG (GNUNET_ERROR_TYPE_DEBUG,
1286          "%s: Calling listen callback\n",
1287          GNUNET_i2s (&socket->other_peer));
1288     if (GNUNET_SYSERR == 
1289         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1290                                     socket,
1291                                     &socket->other_peer))
1292     {
1293       socket->state = STATE_CLOSED;
1294       /* FIXME: We should close in a decent way (send RST) */
1295       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1296       GNUNET_free (socket);
1297     }
1298   }
1299   else
1300     socket->open_cb (socket->open_cls, socket);
1301 }
1302
1303
1304 /**
1305  * Callback to set state to HELLO_WAIT
1306  *
1307  * @param cls the closure from queue_message
1308  * @param socket the socket to requiring state change
1309  */
1310 static void
1311 set_state_hello_wait (void *cls,
1312                       struct GNUNET_STREAM_Socket *socket)
1313 {
1314   GNUNET_assert (STATE_INIT == socket->state);
1315   LOG (GNUNET_ERROR_TYPE_DEBUG,
1316        "%s: Attaining HELLO_WAIT state\n",
1317        GNUNET_i2s (&socket->other_peer));
1318   socket->state = STATE_HELLO_WAIT;
1319 }
1320
1321
1322 /**
1323  * Callback to set state to CLOSE_WAIT
1324  *
1325  * @param cls the closure from queue_message
1326  * @param socket the socket requiring state change
1327  */
1328 static void
1329 set_state_close_wait (void *cls,
1330                       struct GNUNET_STREAM_Socket *socket)
1331 {
1332   LOG (GNUNET_ERROR_TYPE_DEBUG,
1333        "%s: Attaing CLOSE_WAIT state\n",
1334        GNUNET_i2s (&socket->other_peer));
1335   socket->state = STATE_CLOSE_WAIT;
1336   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1337   socket->receive_buffer = NULL;
1338   socket->receive_buffer_size = 0;
1339 }
1340
1341
1342 /**
1343  * Callback to set state to RECEIVE_CLOSE_WAIT
1344  *
1345  * @param cls the closure from queue_message
1346  * @param socket the socket requiring state change
1347  */
1348 static void
1349 set_state_receive_close_wait (void *cls,
1350                               struct GNUNET_STREAM_Socket *socket)
1351 {
1352   LOG (GNUNET_ERROR_TYPE_DEBUG,
1353        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1354        GNUNET_i2s (&socket->other_peer));
1355   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1356   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1357   socket->receive_buffer = NULL;
1358   socket->receive_buffer_size = 0;
1359 }
1360
1361
1362 /**
1363  * Callback to set state to TRANSMIT_CLOSE_WAIT
1364  *
1365  * @param cls the closure from queue_message
1366  * @param socket the socket requiring state change
1367  */
1368 static void
1369 set_state_transmit_close_wait (void *cls,
1370                                struct GNUNET_STREAM_Socket *socket)
1371 {
1372   LOG (GNUNET_ERROR_TYPE_DEBUG,
1373        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1374        GNUNET_i2s (&socket->other_peer));
1375   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1376 }
1377
1378
1379 /**
1380  * Callback to set state to CLOSED
1381  *
1382  * @param cls the closure from queue_message
1383  * @param socket the socket requiring state change
1384  */
1385 static void
1386 set_state_closed (void *cls,
1387                   struct GNUNET_STREAM_Socket *socket)
1388 {
1389   socket->state = STATE_CLOSED;
1390 }
1391
1392
1393 /**
1394  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1395  *
1396  * @return the generate hello message
1397  */
1398 static struct GNUNET_STREAM_MessageHeader *
1399 generate_hello (void)
1400 {
1401   struct GNUNET_STREAM_MessageHeader *msg;
1402
1403   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1404   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1405   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1406   return msg;
1407 }
1408
1409
1410 /**
1411  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1412  * socket
1413  *
1414  * @param socket the socket for which this HelloAckMessage has to be generated
1415  * @param generate_seq GNUNET_YES to generate the write sequence number,
1416  *          GNUNET_NO to use the existing sequence number
1417  * @return the HelloAckMessage
1418  */
1419 static struct GNUNET_STREAM_HelloAckMessage *
1420 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1421                     int generate_seq)
1422 {
1423   struct GNUNET_STREAM_HelloAckMessage *msg;
1424
1425   if (GNUNET_YES == generate_seq)
1426   {
1427     if (GNUNET_YES == socket->testing_active)
1428       socket->write_sequence_number =
1429         socket->testing_set_write_sequence_number_value;
1430     else
1431       socket->write_sequence_number = 
1432         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1433     LOG_DEBUG ("%s: write sequence number %u\n",
1434                GNUNET_i2s (&socket->other_peer),
1435                (unsigned int) socket->write_sequence_number);
1436   }
1437   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1438   msg->header.header.size = 
1439     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1440   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1441   msg->sequence_number = htonl (socket->write_sequence_number);
1442   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1443   return msg;
1444 }
1445
1446
1447 /**
1448  * Task for retransmitting control messages if they aren't ACK'ed before a
1449  * deadline
1450  *
1451  * @param cls the socket
1452  * @param tc the Task context
1453  */
1454 static void
1455 control_retransmission_task (void *cls,
1456                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1457 {
1458   struct GNUNET_STREAM_Socket *socket = cls;
1459     
1460   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1461   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1462     return;
1463   LOG_DEBUG ("%s: Retransmitting a control message\n",
1464                  GNUNET_i2s (&socket->other_peer));
1465   switch (socket->state)
1466   {
1467   case STATE_INIT:    
1468     GNUNET_break (0);
1469     break;
1470   case STATE_LISTEN:
1471     GNUNET_break (0);
1472     break;
1473   case STATE_HELLO_WAIT:
1474     if (NULL == socket->lsocket) /* We are client */
1475       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1476     else
1477       queue_message (socket,
1478                      (struct GNUNET_STREAM_MessageHeader *)
1479                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1480                      GNUNET_NO);
1481     socket->control_retransmission_task_id =
1482     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1483                                   &control_retransmission_task, socket);
1484     break;
1485   case STATE_ESTABLISHED:
1486     if (NULL == socket->lsocket)
1487       queue_message (socket,
1488                      (struct GNUNET_STREAM_MessageHeader *)
1489                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1490                      GNUNET_NO);
1491     else
1492       GNUNET_break (0);
1493   default:
1494     GNUNET_break (0);
1495   }  
1496 }
1497
1498
1499 /**
1500  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1501  *
1502  * @param cls the socket (set from GNUNET_MESH_connect)
1503  * @param tunnel connection to the other end
1504  * @param tunnel_ctx this is NULL
1505  * @param sender who sent the message
1506  * @param message the actual message
1507  * @param atsi performance data for the connection
1508  * @return GNUNET_OK to keep the connection open,
1509  *         GNUNET_SYSERR to close it (signal serious error)
1510  */
1511 static int
1512 client_handle_hello_ack (void *cls,
1513                          struct GNUNET_MESH_Tunnel *tunnel,
1514                          void **tunnel_ctx,
1515                          const struct GNUNET_PeerIdentity *sender,
1516                          const struct GNUNET_MessageHeader *message,
1517                          const struct GNUNET_ATS_Information*atsi)
1518 {
1519   struct GNUNET_STREAM_Socket *socket = cls;
1520   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1521   struct GNUNET_STREAM_HelloAckMessage *reply;
1522
1523   if (0 != memcmp (sender, &socket->other_peer,
1524                    sizeof (struct GNUNET_PeerIdentity)))
1525   {
1526     LOG (GNUNET_ERROR_TYPE_DEBUG,
1527          "%s: Received HELLO_ACK from non-confirming peer\n",
1528          GNUNET_i2s (&socket->other_peer));
1529     return GNUNET_YES;
1530   }
1531   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1532   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1533        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1534   GNUNET_assert (socket->tunnel == tunnel);
1535   switch (socket->state)
1536   {
1537   case STATE_HELLO_WAIT:
1538     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1539     LOG (GNUNET_ERROR_TYPE_DEBUG,
1540          "%s: Read sequence number %u\n",
1541          GNUNET_i2s (&socket->other_peer),
1542          (unsigned int) socket->read_sequence_number);
1543     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1544     reply = generate_hello_ack (socket, GNUNET_YES);
1545     queue_message (socket, &reply->header, &set_state_established,
1546                    NULL, GNUNET_NO);    
1547     return GNUNET_OK;
1548   case STATE_ESTABLISHED:
1549     // call statistics (# ACKs ignored++)
1550     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1551                    socket->control_retransmission_task_id);
1552     socket->control_retransmission_task_id =
1553       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1554     return GNUNET_OK;
1555   default:
1556     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1557                GNUNET_i2s (&socket->other_peer),
1558                GNUNET_i2s (&socket->other_peer), socket->state);
1559     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1560     return GNUNET_SYSERR;
1561   }
1562 }
1563
1564
1565 /**
1566  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1567  *
1568  * @param cls the socket (set from GNUNET_MESH_connect)
1569  * @param tunnel connection to the other end
1570  * @param tunnel_ctx this is NULL
1571  * @param sender who sent the message
1572  * @param message the actual message
1573  * @param atsi performance data for the connection
1574  * @return GNUNET_OK to keep the connection open,
1575  *         GNUNET_SYSERR to close it (signal serious error)
1576  */
1577 static int
1578 client_handle_reset (void *cls,
1579                      struct GNUNET_MESH_Tunnel *tunnel,
1580                      void **tunnel_ctx,
1581                      const struct GNUNET_PeerIdentity *sender,
1582                      const struct GNUNET_MessageHeader *message,
1583                      const struct GNUNET_ATS_Information*atsi)
1584 {
1585   // struct GNUNET_STREAM_Socket *socket = cls;
1586
1587   return GNUNET_OK;
1588 }
1589
1590
1591 /**
1592  * Common message handler for handling TRANSMIT_CLOSE messages
1593  *
1594  * @param socket the socket through which the ack was received
1595  * @param tunnel connection to the other end
1596  * @param sender who sent the message
1597  * @param msg the transmit close message
1598  * @param atsi performance data for the connection
1599  * @return GNUNET_OK to keep the connection open,
1600  *         GNUNET_SYSERR to close it (signal serious error)
1601  */
1602 static int
1603 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1604                        struct GNUNET_MESH_Tunnel *tunnel,
1605                        const struct GNUNET_PeerIdentity *sender,
1606                        const struct GNUNET_STREAM_MessageHeader *msg,
1607                        const struct GNUNET_ATS_Information*atsi)
1608 {
1609   struct GNUNET_STREAM_MessageHeader *reply;
1610
1611   switch (socket->state)
1612   {
1613   case STATE_ESTABLISHED:
1614     socket->state = STATE_RECEIVE_CLOSED;
1615     /* Send TRANSMIT_CLOSE_ACK */
1616     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1617     reply->header.type = 
1618       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1619     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1620     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1621     break;
1622   default:
1623     /* FIXME: Call statistics? */
1624     break;
1625   }
1626   return GNUNET_YES;
1627 }
1628
1629
1630 /**
1631  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1632  *
1633  * @param cls the socket (set from GNUNET_MESH_connect)
1634  * @param tunnel connection to the other end
1635  * @param tunnel_ctx this is NULL
1636  * @param sender who sent the message
1637  * @param message the actual message
1638  * @param atsi performance data for the connection
1639  * @return GNUNET_OK to keep the connection open,
1640  *         GNUNET_SYSERR to close it (signal serious error)
1641  */
1642 static int
1643 client_handle_transmit_close (void *cls,
1644                               struct GNUNET_MESH_Tunnel *tunnel,
1645                               void **tunnel_ctx,
1646                               const struct GNUNET_PeerIdentity *sender,
1647                               const struct GNUNET_MessageHeader *message,
1648                               const struct GNUNET_ATS_Information*atsi)
1649 {
1650   struct GNUNET_STREAM_Socket *socket = cls;
1651   
1652   return handle_transmit_close (socket,
1653                                 tunnel,
1654                                 sender,
1655                                 (struct GNUNET_STREAM_MessageHeader *)message,
1656                                 atsi);
1657 }
1658
1659
1660 /**
1661  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1662  *
1663  * @param socket the socket
1664  * @param tunnel connection to the other end
1665  * @param sender who sent the message
1666  * @param message the actual message
1667  * @param atsi performance data for the connection
1668  * @param operation the close operation which is being ACK'ed
1669  * @return GNUNET_OK to keep the connection open,
1670  *         GNUNET_SYSERR to close it (signal serious error)
1671  */
1672 static int
1673 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1674                           struct GNUNET_MESH_Tunnel *tunnel,
1675                           const struct GNUNET_PeerIdentity *sender,
1676                           const struct GNUNET_STREAM_MessageHeader *message,
1677                           const struct GNUNET_ATS_Information *atsi,
1678                           int operation)
1679 {
1680   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1681
1682   shutdown_handle = socket->shutdown_handle;
1683   if (NULL == shutdown_handle)
1684   {
1685     LOG (GNUNET_ERROR_TYPE_DEBUG,
1686          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1687          GNUNET_i2s (&socket->other_peer));
1688     return GNUNET_OK;
1689   }
1690   switch (operation)
1691   {
1692   case SHUT_RDWR:
1693     switch (socket->state)
1694     {
1695     case STATE_CLOSE_WAIT:
1696       if (SHUT_RDWR != shutdown_handle->operation)
1697       {
1698         LOG (GNUNET_ERROR_TYPE_DEBUG,
1699              "%s: Received CLOSE_ACK when shutdown handle is not for "
1700              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1701         return GNUNET_OK;
1702       }
1703       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1704            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1705       socket->state = STATE_CLOSED;
1706       break;
1707     default:
1708       LOG (GNUNET_ERROR_TYPE_DEBUG,
1709            "%s: Received CLOSE_ACK when in it not expected\n",
1710            GNUNET_i2s (&socket->other_peer));
1711       return GNUNET_OK;
1712     }
1713     break;
1714   case SHUT_RD:
1715     switch (socket->state)
1716     {
1717     case STATE_RECEIVE_CLOSE_WAIT:
1718       if (SHUT_RD != shutdown_handle->operation)
1719       {
1720         LOG (GNUNET_ERROR_TYPE_DEBUG,
1721              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1722              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1723         return GNUNET_OK;
1724       }
1725       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1726            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1727       socket->state = STATE_RECEIVE_CLOSED;
1728       break;
1729     default:
1730       LOG (GNUNET_ERROR_TYPE_DEBUG,
1731            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1732            GNUNET_i2s (&socket->other_peer));
1733       return GNUNET_OK;
1734     }
1735     break;
1736   case SHUT_WR:
1737     switch (socket->state)
1738     {
1739     case STATE_TRANSMIT_CLOSE_WAIT:
1740       if (SHUT_WR != shutdown_handle->operation)
1741       {
1742         LOG (GNUNET_ERROR_TYPE_DEBUG,
1743              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1744              "is not for SHUT_WR\n",
1745              GNUNET_i2s (&socket->other_peer));
1746         return GNUNET_OK;
1747       }
1748       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1749            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1750       socket->state = STATE_TRANSMIT_CLOSED;
1751       break;
1752     default:
1753       LOG (GNUNET_ERROR_TYPE_DEBUG,
1754            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1755            GNUNET_i2s (&socket->other_peer));          
1756       return GNUNET_OK;
1757     }
1758     break;
1759   default:
1760     GNUNET_assert (0);
1761   }
1762   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1763     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1764                                    operation);
1765   if (GNUNET_SCHEDULER_NO_TASK
1766       != shutdown_handle->close_msg_retransmission_task_id)
1767   {
1768     GNUNET_SCHEDULER_cancel
1769       (shutdown_handle->close_msg_retransmission_task_id);
1770     shutdown_handle->close_msg_retransmission_task_id =
1771         GNUNET_SCHEDULER_NO_TASK;
1772   }
1773   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1774   socket->shutdown_handle = NULL;
1775   return GNUNET_OK;
1776 }
1777
1778
1779 /**
1780  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1781  *
1782  * @param cls the socket (set from GNUNET_MESH_connect)
1783  * @param tunnel connection to the other end
1784  * @param tunnel_ctx this is NULL
1785  * @param sender who sent the message
1786  * @param message the actual message
1787  * @param atsi performance data for the connection
1788  * @return GNUNET_OK to keep the connection open,
1789  *         GNUNET_SYSERR to close it (signal serious error)
1790  */
1791 static int
1792 client_handle_transmit_close_ack (void *cls,
1793                                   struct GNUNET_MESH_Tunnel *tunnel,
1794                                   void **tunnel_ctx,
1795                                   const struct GNUNET_PeerIdentity *sender,
1796                                   const struct GNUNET_MessageHeader *message,
1797                                   const struct GNUNET_ATS_Information*atsi)
1798 {
1799   struct GNUNET_STREAM_Socket *socket = cls;
1800
1801   return handle_generic_close_ack (socket,
1802                                    tunnel,
1803                                    sender,
1804                                    (const struct GNUNET_STREAM_MessageHeader *)
1805                                    message,
1806                                    atsi,
1807                                    SHUT_WR);
1808 }
1809
1810
1811 /**
1812  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1813  *
1814  * @param socket the socket
1815  * @param tunnel connection to the other end
1816  * @param sender who sent the message
1817  * @param message the actual message
1818  * @param atsi performance data for the connection
1819  * @return GNUNET_OK to keep the connection open,
1820  *         GNUNET_SYSERR to close it (signal serious error)
1821  */
1822 static int
1823 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1824                       struct GNUNET_MESH_Tunnel *tunnel,
1825                       const struct GNUNET_PeerIdentity *sender,
1826                       const struct GNUNET_STREAM_MessageHeader *message,
1827                       const struct GNUNET_ATS_Information *atsi)
1828 {
1829   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1830
1831   switch (socket->state)
1832   {
1833   case STATE_INIT:
1834   case STATE_LISTEN:
1835   case STATE_HELLO_WAIT:
1836     LOG (GNUNET_ERROR_TYPE_DEBUG,
1837          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1838          GNUNET_i2s (&socket->other_peer));
1839     return GNUNET_OK;
1840   default:
1841     break;
1842   }
1843   
1844   LOG (GNUNET_ERROR_TYPE_DEBUG,
1845        "%s: Received RECEIVE_CLOSE from %s\n",
1846        GNUNET_i2s (&socket->other_peer),
1847        GNUNET_i2s (&socket->other_peer));
1848   receive_close_ack =
1849     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1850   receive_close_ack->header.size =
1851     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1852   receive_close_ack->header.type =
1853     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1854   queue_message (socket, receive_close_ack, &set_state_closed,
1855                  NULL, GNUNET_NO);  
1856   /* FIXME: Handle the case where write handle is present; the write operation
1857      should be deemed as finised and the write continuation callback
1858      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1859   return GNUNET_OK;
1860 }
1861
1862
1863 /**
1864  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1865  *
1866  * @param cls the socket (set from GNUNET_MESH_connect)
1867  * @param tunnel connection to the other end
1868  * @param tunnel_ctx this is NULL
1869  * @param sender who sent the message
1870  * @param message the actual message
1871  * @param atsi performance data for the connection
1872  * @return GNUNET_OK to keep the connection open,
1873  *         GNUNET_SYSERR to close it (signal serious error)
1874  */
1875 static int
1876 client_handle_receive_close (void *cls,
1877                              struct GNUNET_MESH_Tunnel *tunnel,
1878                              void **tunnel_ctx,
1879                              const struct GNUNET_PeerIdentity *sender,
1880                              const struct GNUNET_MessageHeader *message,
1881                              const struct GNUNET_ATS_Information*atsi)
1882 {
1883   struct GNUNET_STREAM_Socket *socket = cls;
1884
1885   return
1886     handle_receive_close (socket,
1887                           tunnel,
1888                           sender,
1889                           (const struct GNUNET_STREAM_MessageHeader *) message,
1890                           atsi);
1891 }
1892
1893
1894 /**
1895  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1896  *
1897  * @param cls the socket (set from GNUNET_MESH_connect)
1898  * @param tunnel connection to the other end
1899  * @param tunnel_ctx this is NULL
1900  * @param sender who sent the message
1901  * @param message the actual message
1902  * @param atsi performance data for the connection
1903  * @return GNUNET_OK to keep the connection open,
1904  *         GNUNET_SYSERR to close it (signal serious error)
1905  */
1906 static int
1907 client_handle_receive_close_ack (void *cls,
1908                                  struct GNUNET_MESH_Tunnel *tunnel,
1909                                  void **tunnel_ctx,
1910                                  const struct GNUNET_PeerIdentity *sender,
1911                                  const struct GNUNET_MessageHeader *message,
1912                                  const struct GNUNET_ATS_Information*atsi)
1913 {
1914   struct GNUNET_STREAM_Socket *socket = cls;
1915
1916   return handle_generic_close_ack (socket,
1917                                    tunnel,
1918                                    sender,
1919                                    (const struct GNUNET_STREAM_MessageHeader *)
1920                                    message,
1921                                    atsi,
1922                                    SHUT_RD);
1923 }
1924
1925
1926 /**
1927  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1928  *
1929  * @param socket the socket
1930  * @param tunnel connection to the other end
1931  * @param sender who sent the message
1932  * @param message the actual message
1933  * @param atsi performance data for the connection
1934  * @return GNUNET_OK to keep the connection open,
1935  *         GNUNET_SYSERR to close it (signal serious error)
1936  */
1937 static int
1938 handle_close (struct GNUNET_STREAM_Socket *socket,
1939               struct GNUNET_MESH_Tunnel *tunnel,
1940               const struct GNUNET_PeerIdentity *sender,
1941               const struct GNUNET_STREAM_MessageHeader *message,
1942               const struct GNUNET_ATS_Information*atsi)
1943 {
1944   struct GNUNET_STREAM_MessageHeader *close_ack;
1945
1946   switch (socket->state)
1947   {
1948   case STATE_INIT:
1949   case STATE_LISTEN:
1950   case STATE_HELLO_WAIT:
1951     LOG (GNUNET_ERROR_TYPE_DEBUG,
1952          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1953          GNUNET_i2s (&socket->other_peer));
1954     return GNUNET_OK;
1955   default:
1956     break;
1957   }
1958
1959   LOG (GNUNET_ERROR_TYPE_DEBUG,
1960        "%s: Received CLOSE from %s\n",
1961        GNUNET_i2s (&socket->other_peer),
1962        GNUNET_i2s (&socket->other_peer));
1963   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1964   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1965   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1966   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1967   if (socket->state == STATE_CLOSED)
1968     return GNUNET_OK;
1969
1970   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1971   socket->receive_buffer = NULL;
1972   socket->receive_buffer_size = 0;
1973   return GNUNET_OK;
1974 }
1975
1976
1977 /**
1978  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1979  *
1980  * @param cls the socket (set from GNUNET_MESH_connect)
1981  * @param tunnel connection to the other end
1982  * @param tunnel_ctx this is NULL
1983  * @param sender who sent the message
1984  * @param message the actual message
1985  * @param atsi performance data for the connection
1986  * @return GNUNET_OK to keep the connection open,
1987  *         GNUNET_SYSERR to close it (signal serious error)
1988  */
1989 static int
1990 client_handle_close (void *cls,
1991                      struct GNUNET_MESH_Tunnel *tunnel,
1992                      void **tunnel_ctx,
1993                      const struct GNUNET_PeerIdentity *sender,
1994                      const struct GNUNET_MessageHeader *message,
1995                      const struct GNUNET_ATS_Information*atsi)
1996 {
1997   struct GNUNET_STREAM_Socket *socket = cls;
1998
1999   return handle_close (socket,
2000                        tunnel,
2001                        sender,
2002                        (const struct GNUNET_STREAM_MessageHeader *) message,
2003                        atsi);
2004 }
2005
2006
2007 /**
2008  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2009  *
2010  * @param cls the socket (set from GNUNET_MESH_connect)
2011  * @param tunnel connection to the other end
2012  * @param tunnel_ctx this is NULL
2013  * @param sender who sent the message
2014  * @param message the actual message
2015  * @param atsi performance data for the connection
2016  * @return GNUNET_OK to keep the connection open,
2017  *         GNUNET_SYSERR to close it (signal serious error)
2018  */
2019 static int
2020 client_handle_close_ack (void *cls,
2021                          struct GNUNET_MESH_Tunnel *tunnel,
2022                          void **tunnel_ctx,
2023                          const struct GNUNET_PeerIdentity *sender,
2024                          const struct GNUNET_MessageHeader *message,
2025                          const struct GNUNET_ATS_Information *atsi)
2026 {
2027   struct GNUNET_STREAM_Socket *socket = cls;
2028
2029   return handle_generic_close_ack (socket,
2030                                    tunnel,
2031                                    sender,
2032                                    (const struct GNUNET_STREAM_MessageHeader *) 
2033                                    message,
2034                                    atsi,
2035                                    SHUT_RDWR);
2036 }
2037
2038 /*****************************/
2039 /* Server's Message Handlers */
2040 /*****************************/
2041
2042 /**
2043  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2044  *
2045  * @param cls the closure
2046  * @param tunnel connection to the other end
2047  * @param tunnel_ctx the socket
2048  * @param sender who sent the message
2049  * @param message the actual message
2050  * @param atsi performance data for the connection
2051  * @return GNUNET_OK to keep the connection open,
2052  *         GNUNET_SYSERR to close it (signal serious error)
2053  */
2054 static int
2055 server_handle_data (void *cls,
2056                     struct GNUNET_MESH_Tunnel *tunnel,
2057                     void **tunnel_ctx,
2058                     const struct GNUNET_PeerIdentity *sender,
2059                     const struct GNUNET_MessageHeader *message,
2060                     const struct GNUNET_ATS_Information*atsi)
2061 {
2062   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2063
2064   return handle_data (socket,
2065                       tunnel,
2066                       sender,
2067                       (const struct GNUNET_STREAM_DataMessage *)message,
2068                       atsi);
2069 }
2070
2071
2072 /**
2073  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2074  *
2075  * @param cls the closure
2076  * @param tunnel connection to the other end
2077  * @param tunnel_ctx the socket
2078  * @param sender who sent the message
2079  * @param message the actual message
2080  * @param atsi performance data for the connection
2081  * @return GNUNET_OK to keep the connection open,
2082  *         GNUNET_SYSERR to close it (signal serious error)
2083  */
2084 static int
2085 server_handle_hello (void *cls,
2086                      struct GNUNET_MESH_Tunnel *tunnel,
2087                      void **tunnel_ctx,
2088                      const struct GNUNET_PeerIdentity *sender,
2089                      const struct GNUNET_MessageHeader *message,
2090                      const struct GNUNET_ATS_Information*atsi)
2091 {
2092   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2093   struct GNUNET_STREAM_HelloAckMessage *reply;
2094
2095   if (0 != memcmp (sender,
2096                    &socket->other_peer,
2097                    sizeof (struct GNUNET_PeerIdentity)))
2098   {
2099     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2100                GNUNET_i2s (&socket->other_peer));
2101     return GNUNET_YES;
2102   }
2103   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2104   GNUNET_assert (socket->tunnel == tunnel);
2105   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2106              GNUNET_i2s (&socket->other_peer));
2107   switch (socket->state)
2108   {
2109   case STATE_INIT:
2110     reply = generate_hello_ack (socket, GNUNET_YES);
2111     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2112                    GNUNET_NO);
2113     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2114                    socket->control_retransmission_task_id);
2115     socket->control_retransmission_task_id =
2116       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2117                                     &control_retransmission_task, socket);
2118     break;
2119   case STATE_HELLO_WAIT:
2120     /* Perhaps our HELLO_ACK was lost */
2121     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2122                    socket->control_retransmission_task_id);
2123     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2124     socket->control_retransmission_task_id =
2125       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2126     break;
2127   default:
2128     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2129                GNUNET_i2s (&socket->other_peer), socket->state);
2130     /* FIXME: Send RESET? */
2131   }
2132   return GNUNET_OK;
2133 }
2134
2135
2136 /**
2137  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2138  *
2139  * @param cls the closure
2140  * @param tunnel connection to the other end
2141  * @param tunnel_ctx the socket
2142  * @param sender who sent the message
2143  * @param message the actual message
2144  * @param atsi performance data for the connection
2145  * @return GNUNET_OK to keep the connection open,
2146  *         GNUNET_SYSERR to close it (signal serious error)
2147  */
2148 static int
2149 server_handle_hello_ack (void *cls,
2150                          struct GNUNET_MESH_Tunnel *tunnel,
2151                          void **tunnel_ctx,
2152                          const struct GNUNET_PeerIdentity *sender,
2153                          const struct GNUNET_MessageHeader *message,
2154                          const struct GNUNET_ATS_Information*atsi)
2155 {
2156   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2157   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2158
2159   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2160                  ntohs (message->type));
2161   GNUNET_assert (socket->tunnel == tunnel);
2162   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2163   switch (socket->state)  
2164   {
2165   case STATE_HELLO_WAIT:
2166     LOG (GNUNET_ERROR_TYPE_DEBUG,
2167          "%s: Received HELLO_ACK from %s\n",
2168          GNUNET_i2s (&socket->other_peer),
2169          GNUNET_i2s (&socket->other_peer));
2170     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2171     LOG (GNUNET_ERROR_TYPE_DEBUG,
2172          "%s: Read sequence number %u\n",
2173          GNUNET_i2s (&socket->other_peer),
2174          (unsigned int) socket->read_sequence_number);
2175     socket->receiver_window_available = 
2176       ntohl (ack_message->receiver_window_size);
2177     set_state_established (NULL, socket);
2178     break;
2179   default:
2180     LOG (GNUNET_ERROR_TYPE_DEBUG,
2181          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2182   }
2183   return GNUNET_OK;
2184 }
2185
2186
2187 /**
2188  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2189  *
2190  * @param cls the closure
2191  * @param tunnel connection to the other end
2192  * @param tunnel_ctx the socket
2193  * @param sender who sent the message
2194  * @param message the actual message
2195  * @param atsi performance data for the connection
2196  * @return GNUNET_OK to keep the connection open,
2197  *         GNUNET_SYSERR to close it (signal serious error)
2198  */
2199 static int
2200 server_handle_reset (void *cls,
2201                      struct GNUNET_MESH_Tunnel *tunnel,
2202                      void **tunnel_ctx,
2203                      const struct GNUNET_PeerIdentity *sender,
2204                      const struct GNUNET_MessageHeader *message,
2205                      const struct GNUNET_ATS_Information*atsi)
2206 {
2207   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2208
2209   return GNUNET_OK;
2210 }
2211
2212
2213 /**
2214  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2215  *
2216  * @param cls the closure
2217  * @param tunnel connection to the other end
2218  * @param tunnel_ctx the socket
2219  * @param sender who sent the message
2220  * @param message the actual message
2221  * @param atsi performance data for the connection
2222  * @return GNUNET_OK to keep the connection open,
2223  *         GNUNET_SYSERR to close it (signal serious error)
2224  */
2225 static int
2226 server_handle_transmit_close (void *cls,
2227                               struct GNUNET_MESH_Tunnel *tunnel,
2228                               void **tunnel_ctx,
2229                               const struct GNUNET_PeerIdentity *sender,
2230                               const struct GNUNET_MessageHeader *message,
2231                               const struct GNUNET_ATS_Information*atsi)
2232 {
2233   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2234
2235   return handle_transmit_close (socket,
2236                                 tunnel,
2237                                 sender,
2238                                 (struct GNUNET_STREAM_MessageHeader *)message,
2239                                 atsi);
2240 }
2241
2242
2243 /**
2244  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2245  *
2246  * @param cls the closure
2247  * @param tunnel connection to the other end
2248  * @param tunnel_ctx the socket
2249  * @param sender who sent the message
2250  * @param message the actual message
2251  * @param atsi performance data for the connection
2252  * @return GNUNET_OK to keep the connection open,
2253  *         GNUNET_SYSERR to close it (signal serious error)
2254  */
2255 static int
2256 server_handle_transmit_close_ack (void *cls,
2257                                   struct GNUNET_MESH_Tunnel *tunnel,
2258                                   void **tunnel_ctx,
2259                                   const struct GNUNET_PeerIdentity *sender,
2260                                   const struct GNUNET_MessageHeader *message,
2261                                   const struct GNUNET_ATS_Information*atsi)
2262 {
2263   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2264
2265   return handle_generic_close_ack (socket,
2266                                    tunnel,
2267                                    sender,
2268                                    (const struct GNUNET_STREAM_MessageHeader *)
2269                                    message,
2270                                    atsi,
2271                                    SHUT_WR);
2272 }
2273
2274
2275 /**
2276  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2277  *
2278  * @param cls the closure
2279  * @param tunnel connection to the other end
2280  * @param tunnel_ctx the socket
2281  * @param sender who sent the message
2282  * @param message the actual message
2283  * @param atsi performance data for the connection
2284  * @return GNUNET_OK to keep the connection open,
2285  *         GNUNET_SYSERR to close it (signal serious error)
2286  */
2287 static int
2288 server_handle_receive_close (void *cls,
2289                              struct GNUNET_MESH_Tunnel *tunnel,
2290                              void **tunnel_ctx,
2291                              const struct GNUNET_PeerIdentity *sender,
2292                              const struct GNUNET_MessageHeader *message,
2293                              const struct GNUNET_ATS_Information*atsi)
2294 {
2295   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2296
2297   return
2298     handle_receive_close (socket,
2299                           tunnel,
2300                           sender,
2301                           (const struct GNUNET_STREAM_MessageHeader *) message,
2302                           atsi);
2303 }
2304
2305
2306 /**
2307  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2308  *
2309  * @param cls the closure
2310  * @param tunnel connection to the other end
2311  * @param tunnel_ctx the socket
2312  * @param sender who sent the message
2313  * @param message the actual message
2314  * @param atsi performance data for the connection
2315  * @return GNUNET_OK to keep the connection open,
2316  *         GNUNET_SYSERR to close it (signal serious error)
2317  */
2318 static int
2319 server_handle_receive_close_ack (void *cls,
2320                                  struct GNUNET_MESH_Tunnel *tunnel,
2321                                  void **tunnel_ctx,
2322                                  const struct GNUNET_PeerIdentity *sender,
2323                                  const struct GNUNET_MessageHeader *message,
2324                                  const struct GNUNET_ATS_Information*atsi)
2325 {
2326   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2327
2328   return handle_generic_close_ack (socket,
2329                                    tunnel,
2330                                    sender,
2331                                    (const struct GNUNET_STREAM_MessageHeader *)
2332                                    message,
2333                                    atsi,
2334                                    SHUT_RD);
2335 }
2336
2337
2338 /**
2339  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2340  *
2341  * @param cls the listen socket (from GNUNET_MESH_connect in
2342  *          GNUNET_STREAM_listen) 
2343  * @param tunnel connection to the other end
2344  * @param tunnel_ctx the socket
2345  * @param sender who sent the message
2346  * @param message the actual message
2347  * @param atsi performance data for the connection
2348  * @return GNUNET_OK to keep the connection open,
2349  *         GNUNET_SYSERR to close it (signal serious error)
2350  */
2351 static int
2352 server_handle_close (void *cls,
2353                      struct GNUNET_MESH_Tunnel *tunnel,
2354                      void **tunnel_ctx,
2355                      const struct GNUNET_PeerIdentity *sender,
2356                      const struct GNUNET_MessageHeader *message,
2357                      const struct GNUNET_ATS_Information*atsi)
2358 {
2359   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2360   
2361   return handle_close (socket,
2362                        tunnel,
2363                        sender,
2364                        (const struct GNUNET_STREAM_MessageHeader *) message,
2365                        atsi);
2366 }
2367
2368
2369 /**
2370  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2371  *
2372  * @param cls the closure
2373  * @param tunnel connection to the other end
2374  * @param tunnel_ctx the socket
2375  * @param sender who sent the message
2376  * @param message the actual message
2377  * @param atsi performance data for the connection
2378  * @return GNUNET_OK to keep the connection open,
2379  *         GNUNET_SYSERR to close it (signal serious error)
2380  */
2381 static int
2382 server_handle_close_ack (void *cls,
2383                          struct GNUNET_MESH_Tunnel *tunnel,
2384                          void **tunnel_ctx,
2385                          const struct GNUNET_PeerIdentity *sender,
2386                          const struct GNUNET_MessageHeader *message,
2387                          const struct GNUNET_ATS_Information*atsi)
2388 {
2389   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2390
2391   return handle_generic_close_ack (socket,
2392                                    tunnel,
2393                                    sender,
2394                                    (const struct GNUNET_STREAM_MessageHeader *) 
2395                                    message,
2396                                    atsi,
2397                                    SHUT_RDWR);
2398 }
2399
2400
2401 /**
2402  * Handler for DATA_ACK messages
2403  *
2404  * @param socket the socket through which the ack was received
2405  * @param tunnel connection to the other end
2406  * @param sender who sent the message
2407  * @param ack the acknowledgment message
2408  * @param atsi performance data for the connection
2409  * @return GNUNET_OK to keep the connection open,
2410  *         GNUNET_SYSERR to close it (signal serious error)
2411  */
2412 static int
2413 handle_ack (struct GNUNET_STREAM_Socket *socket,
2414             struct GNUNET_MESH_Tunnel *tunnel,
2415             const struct GNUNET_PeerIdentity *sender,
2416             const struct GNUNET_STREAM_AckMessage *ack,
2417             const struct GNUNET_ATS_Information*atsi)
2418 {
2419   unsigned int packet;
2420   int need_retransmission;
2421   uint32_t sequence_difference;
2422   
2423   if (0 != memcmp (sender,
2424                    &socket->other_peer,
2425                    sizeof (struct GNUNET_PeerIdentity)))
2426   {
2427     LOG (GNUNET_ERROR_TYPE_DEBUG,
2428          "%s: Received ACK from non-confirming peer\n",
2429          GNUNET_i2s (&socket->other_peer));
2430     return GNUNET_YES;
2431   }
2432   switch (socket->state)
2433   {
2434   case (STATE_ESTABLISHED):
2435   case (STATE_RECEIVE_CLOSED):
2436   case (STATE_RECEIVE_CLOSE_WAIT):
2437     if (NULL == socket->write_handle)
2438     {
2439       LOG (GNUNET_ERROR_TYPE_DEBUG,
2440            "%s: Received DATA_ACK when write_handle is NULL\n",
2441            GNUNET_i2s (&socket->other_peer));
2442       return GNUNET_OK;
2443     }
2444     sequence_difference = 
2445         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2446     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2447     {
2448       LOG (GNUNET_ERROR_TYPE_DEBUG,
2449            "%s: Received DATA_ACK with unexpected base sequence number\n",
2450            GNUNET_i2s (&socket->other_peer));
2451       LOG (GNUNET_ERROR_TYPE_DEBUG,
2452            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2453            GNUNET_i2s (&socket->other_peer),
2454            socket->write_sequence_number,
2455            ntohl (ack->base_sequence_number));
2456       return GNUNET_OK;
2457     }
2458     /* FIXME: include the case when write_handle is cancelled - ignore the 
2459        acks */
2460     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2461          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2462     /* Cancel the retransmission task */
2463     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2464     {
2465       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2466       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2467     }
2468     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2469     {
2470       if (NULL == socket->write_handle->messages[packet]) break;
2471       /* BS: Base sequence from ack; PS: sequence num of current packet */
2472       sequence_difference = ntohl (ack->base_sequence_number)
2473         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2474       if ((0 == sequence_difference) ||
2475           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2476         continue; /* The message in our handle is not yet received */
2477       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2478       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2479       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2480                             packet, GNUNET_YES);
2481     }
2482     /* Update the receive window remaining
2483        FIXME : Should update with the value from a data ack with greater
2484        sequence number */
2485     socket->receiver_window_available = 
2486       ntohl (ack->receive_window_remaining);
2487     /* Check if we have received all acknowledgements */
2488     need_retransmission = GNUNET_NO;
2489     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2490     {
2491       if (NULL == socket->write_handle->messages[packet]) break;
2492       if (GNUNET_YES != ackbitmap_is_bit_set 
2493           (&socket->write_handle->ack_bitmap,packet))
2494       {
2495         need_retransmission = GNUNET_YES;
2496         break;
2497       }
2498     }
2499     if (GNUNET_YES == need_retransmission)
2500     {
2501       write_data (socket);
2502     }
2503     else      /* We have to call the write continuation callback now */
2504     {
2505       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2506       
2507       /* Free the packets */
2508       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2509       {
2510         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2511       }
2512       write_handle = socket->write_handle;
2513       socket->write_handle = NULL;
2514       if (NULL != write_handle->write_cont)
2515         write_handle->write_cont (write_handle->write_cont_cls,
2516                                   socket->status,
2517                                   write_handle->size);
2518       /* We are done with the write handle - Freeing it */
2519       GNUNET_free (write_handle);
2520       LOG (GNUNET_ERROR_TYPE_DEBUG,
2521            "%s: Write completion callback completed\n",
2522            GNUNET_i2s (&socket->other_peer));      
2523     }
2524     break;
2525   default:
2526     break;
2527   }
2528   return GNUNET_OK;
2529 }
2530
2531
2532 /**
2533  * Handler for DATA_ACK messages
2534  *
2535  * @param cls the 'struct GNUNET_STREAM_Socket'
2536  * @param tunnel connection to the other end
2537  * @param tunnel_ctx unused
2538  * @param sender who sent the message
2539  * @param message the actual message
2540  * @param atsi performance data for the connection
2541  * @return GNUNET_OK to keep the connection open,
2542  *         GNUNET_SYSERR to close it (signal serious error)
2543  */
2544 static int
2545 client_handle_ack (void *cls,
2546                    struct GNUNET_MESH_Tunnel *tunnel,
2547                    void **tunnel_ctx,
2548                    const struct GNUNET_PeerIdentity *sender,
2549                    const struct GNUNET_MessageHeader *message,
2550                    const struct GNUNET_ATS_Information*atsi)
2551 {
2552   struct GNUNET_STREAM_Socket *socket = cls;
2553   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2554  
2555   return handle_ack (socket, tunnel, sender, ack, atsi);
2556 }
2557
2558
2559 /**
2560  * Handler for DATA_ACK messages
2561  *
2562  * @param cls the server's listen socket
2563  * @param tunnel connection to the other end
2564  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2565  * @param sender who sent the message
2566  * @param message the actual message
2567  * @param atsi performance data for the connection
2568  * @return GNUNET_OK to keep the connection open,
2569  *         GNUNET_SYSERR to close it (signal serious error)
2570  */
2571 static int
2572 server_handle_ack (void *cls,
2573                    struct GNUNET_MESH_Tunnel *tunnel,
2574                    void **tunnel_ctx,
2575                    const struct GNUNET_PeerIdentity *sender,
2576                    const struct GNUNET_MessageHeader *message,
2577                    const struct GNUNET_ATS_Information*atsi)
2578 {
2579   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2580   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2581  
2582   return handle_ack (socket, tunnel, sender, ack, atsi);
2583 }
2584
2585
2586 /**
2587  * For client message handlers, the stream socket is in the
2588  * closure argument.
2589  */
2590 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2591   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2592   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2593    sizeof (struct GNUNET_STREAM_AckMessage) },
2594   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2595    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2596   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2597    sizeof (struct GNUNET_STREAM_MessageHeader)},
2598   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2599    sizeof (struct GNUNET_STREAM_MessageHeader)},
2600   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2601    sizeof (struct GNUNET_STREAM_MessageHeader)},
2602   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2603    sizeof (struct GNUNET_STREAM_MessageHeader)},
2604   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2605    sizeof (struct GNUNET_STREAM_MessageHeader)},
2606   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2607    sizeof (struct GNUNET_STREAM_MessageHeader)},
2608   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2609    sizeof (struct GNUNET_STREAM_MessageHeader)},
2610   {NULL, 0, 0}
2611 };
2612
2613
2614 /**
2615  * For server message handlers, the stream socket is in the
2616  * tunnel context, and the listen socket in the closure argument.
2617  */
2618 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2619   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2620   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2621    sizeof (struct GNUNET_STREAM_AckMessage) },
2622   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2623    sizeof (struct GNUNET_STREAM_MessageHeader)},
2624   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2625    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2626   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2627    sizeof (struct GNUNET_STREAM_MessageHeader)},
2628   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2629    sizeof (struct GNUNET_STREAM_MessageHeader)},
2630   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2631    sizeof (struct GNUNET_STREAM_MessageHeader)},
2632   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2633    sizeof (struct GNUNET_STREAM_MessageHeader)},
2634   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2635    sizeof (struct GNUNET_STREAM_MessageHeader)},
2636   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2637    sizeof (struct GNUNET_STREAM_MessageHeader)},
2638   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2639    sizeof (struct GNUNET_STREAM_MessageHeader)},
2640   {NULL, 0, 0}
2641 };
2642
2643
2644 /**
2645  * Function called when our target peer is connected to our tunnel
2646  *
2647  * @param cls the socket for which this tunnel is created
2648  * @param peer the peer identity of the target
2649  * @param atsi performance data for the connection
2650  */
2651 static void
2652 mesh_peer_connect_callback (void *cls,
2653                             const struct GNUNET_PeerIdentity *peer,
2654                             const struct GNUNET_ATS_Information * atsi)
2655 {
2656   struct GNUNET_STREAM_Socket *socket = cls;
2657   struct GNUNET_STREAM_MessageHeader *message;
2658   
2659   if (0 != memcmp (peer,
2660                    &socket->other_peer,
2661                    sizeof (struct GNUNET_PeerIdentity)))
2662   {
2663     LOG (GNUNET_ERROR_TYPE_DEBUG,
2664          "%s: A peer which is not our target has connected to our tunnel\n",
2665          GNUNET_i2s(peer));
2666     return;
2667   }
2668   LOG (GNUNET_ERROR_TYPE_DEBUG,
2669        "%s: Target peer %s connected\n",
2670        GNUNET_i2s (&socket->other_peer),
2671        GNUNET_i2s (&socket->other_peer));
2672   /* Set state to INIT */
2673   socket->state = STATE_INIT;
2674   /* Send HELLO message */
2675   message = generate_hello ();
2676   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2677   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2678                  socket->control_retransmission_task_id);
2679   socket->control_retransmission_task_id =
2680     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2681                                   &control_retransmission_task, socket);
2682 }
2683
2684
2685 /**
2686  * Function called when our target peer is disconnected from our tunnel
2687  *
2688  * @param cls the socket associated which this tunnel
2689  * @param peer the peer identity of the target
2690  */
2691 static void
2692 mesh_peer_disconnect_callback (void *cls,
2693                                const struct GNUNET_PeerIdentity *peer)
2694 {
2695   struct GNUNET_STREAM_Socket *socket=cls;
2696   
2697   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2698   LOG (GNUNET_ERROR_TYPE_DEBUG,
2699        "%s: Other peer %s disconnected \n",
2700        GNUNET_i2s (&socket->other_peer),
2701        GNUNET_i2s (&socket->other_peer));
2702 }
2703
2704
2705 /**
2706  * Method called whenever a peer creates a tunnel to us
2707  *
2708  * @param cls closure
2709  * @param tunnel new handle to the tunnel
2710  * @param initiator peer that started the tunnel
2711  * @param atsi performance information for the tunnel
2712  * @return initial tunnel context for the tunnel
2713  *         (can be NULL -- that's not an error)
2714  */
2715 static void *
2716 new_tunnel_notify (void *cls,
2717                    struct GNUNET_MESH_Tunnel *tunnel,
2718                    const struct GNUNET_PeerIdentity *initiator,
2719                    const struct GNUNET_ATS_Information *atsi)
2720 {
2721   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2722   struct GNUNET_STREAM_Socket *socket;
2723
2724   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2725      from the same peer again until the socket is closed */
2726
2727   if (GNUNET_NO == lsocket->listening)
2728   {
2729     GNUNET_MESH_tunnel_destroy (tunnel);
2730     return NULL;
2731   }
2732   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2733   socket->other_peer = *initiator;
2734   socket->tunnel = tunnel;
2735   socket->state = STATE_INIT;
2736   socket->lsocket = lsocket;
2737   socket->retransmit_timeout = lsocket->retransmit_timeout;
2738   socket->testing_active = lsocket->testing_active;
2739   socket->testing_set_write_sequence_number_value =
2740       lsocket->testing_set_write_sequence_number_value;
2741   socket->max_payload_size = lsocket->max_payload_size;
2742   LOG (GNUNET_ERROR_TYPE_DEBUG,
2743        "%s: Peer %s initiated tunnel to us\n", 
2744        GNUNET_i2s (&socket->other_peer),
2745        GNUNET_i2s (&socket->other_peer));
2746   return socket;
2747 }
2748
2749
2750 /**
2751  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2752  * any associated state.  This function is NOT called if the client has
2753  * explicitly asked for the tunnel to be destroyed using
2754  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2755  * the tunnel.
2756  *
2757  * @param cls closure (set from GNUNET_MESH_connect)
2758  * @param tunnel connection to the other end (henceforth invalid)
2759  * @param tunnel_ctx place where local state associated
2760  *                   with the tunnel is stored
2761  */
2762 static void 
2763 tunnel_cleaner (void *cls,
2764                 const struct GNUNET_MESH_Tunnel *tunnel,
2765                 void *tunnel_ctx)
2766 {
2767   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2768   struct MessageQueue *head;
2769
2770   GNUNET_assert (tunnel == socket->tunnel);
2771   GNUNET_break_op(0);
2772   LOG (GNUNET_ERROR_TYPE_DEBUG,
2773        "%s: Peer %s has terminated connection abruptly\n",
2774        GNUNET_i2s (&socket->other_peer),
2775        GNUNET_i2s (&socket->other_peer));
2776   socket->status = GNUNET_STREAM_SHUTDOWN;
2777   /* Clear Transmit handles */
2778   if (NULL != socket->transmit_handle)
2779   {
2780     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2781     socket->transmit_handle = NULL;
2782   }
2783   /* Stop Tasks using socket->tunnel */
2784   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2785   {
2786     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2787     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2788   }
2789   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2790   {
2791     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2792     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2793   }  
2794   /* Terminate the control retransmission tasks */
2795   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2796   {
2797     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2798     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2799   }
2800   /* Clear Transmit handles */
2801   if (NULL != socket->transmit_handle)
2802   {
2803     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2804     socket->transmit_handle = NULL;
2805   }
2806   /* Clear existing message queue */
2807   while (NULL != (head = socket->queue_head)) {
2808     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2809                                  socket->queue_tail,
2810                                  head);
2811     GNUNET_free (head->message);
2812     GNUNET_free (head);
2813   }
2814   socket->tunnel = NULL;
2815 }
2816
2817
2818 /**
2819  * Callback to signal timeout on lockmanager lock acquire
2820  *
2821  * @param cls the ListenSocket
2822  * @param tc the scheduler task context
2823  */
2824 static void
2825 lockmanager_acquire_timeout (void *cls, 
2826                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2827 {
2828   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2829   GNUNET_STREAM_ListenCallback listen_cb;
2830   void *listen_cb_cls;
2831
2832   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2833   listen_cb = lsocket->listen_cb;
2834   listen_cb_cls = lsocket->listen_cb_cls;
2835   if (NULL != listen_cb)
2836     listen_cb (listen_cb_cls, NULL, NULL);
2837 }
2838
2839
2840 /**
2841  * Callback to notify us on the status changes on app_port lock
2842  *
2843  * @param cls the ListenSocket
2844  * @param domain the domain name of the lock
2845  * @param lock the app_port
2846  * @param status the current status of the lock
2847  */
2848 static void
2849 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2850                        enum GNUNET_LOCKMANAGER_Status status)
2851 {
2852   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2853
2854   GNUNET_assert (lock == (uint32_t) lsocket->port);
2855   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2856   {
2857     lsocket->listening = GNUNET_YES;
2858     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2859     {
2860       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2861       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2862     }
2863     if (NULL == lsocket->mesh)
2864     {
2865       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2866
2867       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2868                                            lsocket, /* Closure */
2869                                            &new_tunnel_notify,
2870                                            &tunnel_cleaner,
2871                                            server_message_handlers,
2872                                            ports);
2873       GNUNET_assert (NULL != lsocket->mesh);
2874       if (NULL != lsocket->listen_ok_cb)
2875       {
2876         (void) lsocket->listen_ok_cb ();
2877       }
2878     }
2879   }
2880   if (GNUNET_LOCKMANAGER_RELEASE == status)
2881     lsocket->listening = GNUNET_NO;
2882 }
2883
2884
2885 /*****************/
2886 /* API functions */
2887 /*****************/
2888
2889
2890 /**
2891  * Tries to open a stream to the target peer
2892  *
2893  * @param cfg configuration to use
2894  * @param target the target peer to which the stream has to be opened
2895  * @param app_port the application port number which uniquely identifies this
2896  *            stream
2897  * @param open_cb this function will be called after stream has be established;
2898  *          cannot be NULL
2899  * @param open_cb_cls the closure for open_cb
2900  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2901  * @return if successful it returns the stream socket; NULL if stream cannot be
2902  *         opened 
2903  */
2904 struct GNUNET_STREAM_Socket *
2905 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2906                     const struct GNUNET_PeerIdentity *target,
2907                     GNUNET_MESH_ApplicationType app_port,
2908                     GNUNET_STREAM_OpenCallback open_cb,
2909                     void *open_cb_cls,
2910                     ...)
2911 {
2912   struct GNUNET_STREAM_Socket *socket;
2913   enum GNUNET_STREAM_Option option;
2914   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2915   va_list vargs;
2916   uint16_t payload_size;
2917
2918   LOG (GNUNET_ERROR_TYPE_DEBUG,
2919        "%s\n", __func__);
2920   GNUNET_assert (NULL != open_cb);
2921   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2922   socket->other_peer = *target;
2923   socket->open_cb = open_cb;
2924   socket->open_cls = open_cb_cls;
2925   /* Set defaults */
2926   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2927   socket->testing_active = GNUNET_NO;
2928   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2929   va_start (vargs, open_cb_cls); /* Parse variable args */
2930   do {
2931     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2932     switch (option)
2933     {
2934     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2935       /* Expect struct GNUNET_TIME_Relative */
2936       socket->retransmit_timeout = va_arg (vargs,
2937                                            struct GNUNET_TIME_Relative);
2938       break;
2939     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2940       socket->testing_active = GNUNET_YES;
2941       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2942                                                                 uint32_t);
2943       break;
2944     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2945       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2946       break;
2947     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2948       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2949       break;
2950     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2951       payload_size = (uint16_t) va_arg (vargs, unsigned int);
2952       GNUNET_assert (0 != payload_size);
2953       if (payload_size < socket->max_payload_size)
2954         socket->max_payload_size = payload_size;
2955       break;
2956     case GNUNET_STREAM_OPTION_END:
2957       break;
2958     }
2959   } while (GNUNET_STREAM_OPTION_END != option);
2960   va_end (vargs);               /* End of variable args parsing */
2961   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2962                                       socket, /* cls */
2963                                       NULL, /* No inbound tunnel handler */
2964                                       NULL, /* No in-tunnel cleaner */
2965                                       client_message_handlers,
2966                                       ports); /* We don't get inbound tunnels */
2967   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2968   {
2969     GNUNET_free (socket);
2970     return NULL;
2971   }
2972   /* Now create the mesh tunnel to target */
2973   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2974   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2975                                               NULL, /* Tunnel context */
2976                                               &mesh_peer_connect_callback,
2977                                               &mesh_peer_disconnect_callback,
2978                                               socket);
2979   GNUNET_assert (NULL != socket->tunnel);
2980   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2981                                         &socket->other_peer);
2982   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2983   return socket;
2984 }
2985
2986
2987 /**
2988  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2989  *
2990  * @param socket the stream socket
2991  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2992  * @param completion_cb the callback that will be called upon successful
2993  *          shutdown of given operation
2994  * @param completion_cls the closure for the completion callback
2995  * @return the shutdown handle
2996  */
2997 struct GNUNET_STREAM_ShutdownHandle *
2998 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2999                         int operation,
3000                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3001                         void *completion_cls)
3002 {
3003   struct GNUNET_STREAM_ShutdownHandle *handle;
3004   struct GNUNET_STREAM_MessageHeader *msg;
3005   
3006   GNUNET_assert (NULL == socket->shutdown_handle);
3007
3008   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3009   handle->socket = socket;
3010   handle->completion_cb = completion_cb;
3011   handle->completion_cls = completion_cls;
3012   socket->shutdown_handle = handle;
3013
3014   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3015   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3016   switch (operation)
3017   {
3018   case SHUT_RD:
3019     handle->operation = SHUT_RD;
3020     if (NULL != socket->read_handle)
3021       LOG (GNUNET_ERROR_TYPE_WARNING,
3022            "Existing read handle should be cancelled before shutting"
3023            " down reading\n");
3024     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3025     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3026                    GNUNET_NO);
3027     break;
3028   case SHUT_WR:
3029     handle->operation = SHUT_WR;
3030     if (NULL != socket->write_handle)
3031       LOG (GNUNET_ERROR_TYPE_WARNING,
3032            "Existing write handle should be cancelled before shutting"
3033            " down writing\n");
3034     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3035     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3036                    GNUNET_NO);
3037     break;
3038   case SHUT_RDWR:
3039     handle->operation = SHUT_RDWR;
3040     if (NULL != socket->write_handle)
3041       LOG (GNUNET_ERROR_TYPE_WARNING,
3042            "Existing write handle should be cancelled before shutting"
3043            " down writing\n");
3044     if (NULL != socket->read_handle)
3045       LOG (GNUNET_ERROR_TYPE_WARNING,
3046            "Existing read handle should be cancelled before shutting"
3047            " down reading\n");
3048     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3049     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3050     break;
3051   default:
3052     LOG (GNUNET_ERROR_TYPE_WARNING,
3053          "GNUNET_STREAM_shutdown called with invalid value for "
3054          "parameter operation -- Ignoring\n");
3055     GNUNET_free (msg);
3056     GNUNET_free (handle);
3057     return NULL;
3058   }
3059   handle->close_msg_retransmission_task_id =
3060     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3061                                   &close_msg_retransmission_task,
3062                                   handle);
3063   return handle;
3064 }
3065
3066
3067 /**
3068  * Cancels a pending shutdown
3069  *
3070  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3071  */
3072 void
3073 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3074 {
3075   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3076     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3077   handle->socket->shutdown_handle = NULL;
3078   GNUNET_free (handle);
3079 }
3080
3081
3082 /**
3083  * Closes the stream
3084  *
3085  * @param socket the stream socket
3086  */
3087 void
3088 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3089 {
3090   struct MessageQueue *head;
3091
3092   if (NULL != socket->read_handle)
3093   {
3094     LOG (GNUNET_ERROR_TYPE_WARNING,
3095          "Closing STREAM socket when a read handle is pending\n");
3096     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3097   }
3098   if (NULL != socket->write_handle)
3099   {
3100     LOG (GNUNET_ERROR_TYPE_WARNING,
3101          "Closing STREAM socket when a write handle is pending\n");
3102     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3103     //socket->write_handle = NULL;
3104   }
3105   /* Terminate the ack'ing task if they are still present */
3106   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3107   {
3108     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3109     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3110   }
3111   /* Terminate the control retransmission tasks */
3112   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3113   {
3114     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3115   }
3116   /* Clear Transmit handles */
3117   if (NULL != socket->transmit_handle)
3118   {
3119     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3120     socket->transmit_handle = NULL;
3121   }
3122   /* Clear existing message queue */
3123   while (NULL != (head = socket->queue_head)) {
3124     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3125                                  socket->queue_tail,
3126                                  head);
3127     GNUNET_free (head->message);
3128     GNUNET_free (head);
3129   }
3130   /* Close associated tunnel */
3131   if (NULL != socket->tunnel)
3132   {
3133     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3134     socket->tunnel = NULL;
3135   }
3136   /* Close mesh connection */
3137   if (NULL != socket->mesh && NULL == socket->lsocket)
3138   {
3139     GNUNET_MESH_disconnect (socket->mesh);
3140     socket->mesh = NULL;
3141   }  
3142   /* Release receive buffer */
3143   if (NULL != socket->receive_buffer)
3144   {
3145     GNUNET_free (socket->receive_buffer);
3146   }
3147   GNUNET_free (socket);
3148 }
3149
3150
3151 /**
3152  * Listens for stream connections for a specific application ports
3153  *
3154  * @param cfg the configuration to use
3155  * @param app_port the application port for which new streams will be accepted
3156  * @param listen_cb this function will be called when a peer tries to establish
3157  *            a stream with us
3158  * @param listen_cb_cls closure for listen_cb
3159  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3160  * @return listen socket, NULL for any error
3161  */
3162 struct GNUNET_STREAM_ListenSocket *
3163 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3164                       GNUNET_MESH_ApplicationType app_port,
3165                       GNUNET_STREAM_ListenCallback listen_cb,
3166                       void *listen_cb_cls,
3167                       ...)
3168 {
3169   /* FIXME: Add variable args for passing configration options? */
3170   struct GNUNET_STREAM_ListenSocket *lsocket;
3171   struct GNUNET_TIME_Relative listen_timeout;
3172   enum GNUNET_STREAM_Option option;
3173   va_list vargs;
3174   uint16_t payload_size;
3175
3176   GNUNET_assert (NULL != listen_cb);
3177   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3178   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3179   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3180   if (NULL == lsocket->lockmanager)
3181   {
3182     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3183     GNUNET_free (lsocket);
3184     return NULL;
3185   }
3186   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3187   /* Set defaults */
3188   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3189   lsocket->testing_active = GNUNET_NO;
3190   lsocket->listen_ok_cb = NULL;
3191   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3192   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3193   va_start (vargs, listen_cb_cls);
3194   do {
3195     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3196     switch (option)
3197     {
3198     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3199       lsocket->retransmit_timeout = va_arg (vargs,
3200                                             struct GNUNET_TIME_Relative);
3201       break;
3202     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3203       lsocket->testing_active = GNUNET_YES;
3204       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3205                                                                  uint32_t);
3206       break;
3207     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3208       listen_timeout = GNUNET_TIME_relative_multiply
3209         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3210       break;
3211     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3212       lsocket->listen_ok_cb = va_arg (vargs,
3213                                       GNUNET_STREAM_ListenSuccessCallback);
3214       break;
3215     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3216       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3217       GNUNET_assert (0 != payload_size);
3218       if (payload_size < lsocket->max_payload_size)
3219         lsocket->max_payload_size = payload_size;
3220       break;
3221     case GNUNET_STREAM_OPTION_END:
3222       break;
3223     }
3224   } while (GNUNET_STREAM_OPTION_END != option);
3225   va_end (vargs);
3226   lsocket->port = app_port;
3227   lsocket->listen_cb = listen_cb;
3228   lsocket->listen_cb_cls = listen_cb_cls;
3229   lsocket->locking_request = 
3230     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3231                                      (uint32_t) lsocket->port,
3232                                      &lock_status_change_cb, lsocket);
3233   lsocket->lockmanager_acquire_timeout_task =
3234     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3235                                   &lockmanager_acquire_timeout, lsocket);
3236   return lsocket;
3237 }
3238
3239
3240 /**
3241  * Closes the listen socket
3242  *
3243  * @param lsocket the listen socket
3244  */
3245 void
3246 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3247 {
3248   /* Close MESH connection */
3249   if (NULL != lsocket->mesh)
3250     GNUNET_MESH_disconnect (lsocket->mesh);
3251   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3252   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3253     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3254   if (NULL != lsocket->locking_request)
3255     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3256   if (NULL != lsocket->lockmanager)
3257     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3258   GNUNET_free (lsocket);
3259 }
3260
3261
3262 /**
3263  * Tries to write the given data to the stream. The maximum size of data that
3264  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3265  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3266  * violation, however only the said number of maximum bytes will be written.
3267  *
3268  * @param socket the socket representing a stream
3269  * @param data the data buffer from where the data is written into the stream
3270  * @param size the number of bytes to be written from the data buffer
3271  * @param timeout the timeout period
3272  * @param write_cont the function to call upon writing some bytes into the
3273  *          stream 
3274  * @param write_cont_cls the closure
3275  *
3276  * @return handle to cancel the operation; if a previous write is pending or
3277  *           the stream has been shutdown for this operation then write_cont is
3278  *           immediately called and NULL is returned.
3279  */
3280 struct GNUNET_STREAM_IOWriteHandle *
3281 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3282                      const void *data,
3283                      size_t size,
3284                      struct GNUNET_TIME_Relative timeout,
3285                      GNUNET_STREAM_CompletionContinuation write_cont,
3286                      void *write_cont_cls)
3287 {
3288   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3289   struct GNUNET_STREAM_DataMessage *data_msg;
3290   const void *sweep;
3291   struct GNUNET_TIME_Relative ack_deadline;
3292   unsigned int num_needed_packets;
3293   unsigned int packet;
3294   uint32_t packet_size;
3295   uint32_t payload_size;
3296   uint16_t max_data_packet_size;
3297
3298   LOG (GNUNET_ERROR_TYPE_DEBUG,
3299        "%s\n", __func__);
3300   if (NULL != socket->write_handle)
3301   {
3302     GNUNET_break (0);
3303     return NULL;
3304   }
3305   switch (socket->state)
3306   {
3307   case STATE_TRANSMIT_CLOSED:
3308   case STATE_TRANSMIT_CLOSE_WAIT:
3309   case STATE_CLOSED:
3310   case STATE_CLOSE_WAIT:
3311     if (NULL != write_cont)
3312       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3313     LOG (GNUNET_ERROR_TYPE_DEBUG,
3314          "%s() END\n", __func__);
3315     return NULL;
3316   case STATE_INIT:
3317   case STATE_LISTEN:
3318   case STATE_HELLO_WAIT:
3319     if (NULL != write_cont)
3320       /* FIXME: GNUNET_STREAM_SYSERR?? */
3321       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3322     LOG (GNUNET_ERROR_TYPE_DEBUG,
3323          "%s() END\n", __func__);
3324     return NULL;
3325   case STATE_ESTABLISHED:
3326   case STATE_RECEIVE_CLOSED:
3327   case STATE_RECEIVE_CLOSE_WAIT:
3328     break;
3329   }
3330   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3331     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3332   num_needed_packets =
3333       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3334   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3335   io_handle->socket = socket;
3336   io_handle->write_cont = write_cont;
3337   io_handle->write_cont_cls = write_cont_cls;
3338   io_handle->size = size;
3339   io_handle->packets_sent = 0;
3340   sweep = data;
3341   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3342      determined from RTT */
3343   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3344   /* Divide the given buffer into packets for sending */
3345   max_data_packet_size =
3346       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3347   for (packet=0; packet < num_needed_packets; packet++)
3348   {
3349     if ((packet + 1) * socket->max_payload_size < size) 
3350     {
3351       payload_size = socket->max_payload_size;
3352       packet_size = max_data_packet_size;
3353     }
3354     else 
3355     {
3356       payload_size = size - packet * socket->max_payload_size;
3357       packet_size = 
3358           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3359     }
3360     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3361     io_handle->messages[packet]->header.header.size = htons (packet_size);
3362     io_handle->messages[packet]->header.header.type =
3363       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3364     io_handle->messages[packet]->sequence_number =
3365       htonl (socket->write_sequence_number++);
3366     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3367     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3368        determined from RTT */
3369     io_handle->messages[packet]->ack_deadline =
3370       GNUNET_TIME_relative_hton (ack_deadline);
3371     data_msg = io_handle->messages[packet];
3372     /* Copy data from given buffer to the packet */
3373     memcpy (&data_msg[1], sweep, payload_size);
3374     sweep += payload_size;
3375     socket->write_offset += payload_size;
3376   }
3377   /* ack the last data message. FIXME: remove when we figure out how to do this
3378      using RTT */
3379   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3380       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3381   socket->write_handle = io_handle;
3382   write_data (socket);
3383   LOG (GNUNET_ERROR_TYPE_DEBUG,
3384        "%s() END\n", __func__);
3385   return io_handle;
3386 }
3387
3388
3389 /**
3390  * Tries to read data from the stream.
3391  *
3392  * @param socket the socket representing a stream
3393  * @param timeout the timeout period
3394  * @param proc function to call with data (once only)
3395  * @param proc_cls the closure for proc
3396  *
3397  * @return handle to cancel the operation; NULL is returned if: the stream has
3398  *           been shutdown for this type of opeartion (the DataProcessor is
3399  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3400  *           read handle is present (only one read handle per socket is present
3401  *           at any time)
3402  */
3403 struct GNUNET_STREAM_IOReadHandle *
3404 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3405                     struct GNUNET_TIME_Relative timeout,
3406                     GNUNET_STREAM_DataProcessor proc,
3407                     void *proc_cls)
3408 {
3409   struct GNUNET_STREAM_IOReadHandle *read_handle;
3410   
3411   LOG (GNUNET_ERROR_TYPE_DEBUG,
3412        "%s: %s()\n", 
3413        GNUNET_i2s (&socket->other_peer),
3414        __func__);
3415   /* Return NULL if there is already a read handle; the user has to cancel that
3416      first before continuing or has to wait until it is completed */
3417   if (NULL != socket->read_handle) 
3418     return NULL;
3419   GNUNET_assert (NULL != proc);
3420   switch (socket->state)
3421   {
3422   case STATE_RECEIVE_CLOSED:
3423   case STATE_RECEIVE_CLOSE_WAIT:
3424   case STATE_CLOSED:
3425   case STATE_CLOSE_WAIT:
3426     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3427     LOG (GNUNET_ERROR_TYPE_DEBUG,
3428          "%s: %s() END\n",
3429          GNUNET_i2s (&socket->other_peer),
3430          __func__);
3431     return NULL;
3432   default:
3433     break;
3434   }
3435   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3436   read_handle->proc = proc;
3437   read_handle->proc_cls = proc_cls;
3438   read_handle->socket = socket;
3439   socket->read_handle = read_handle;
3440   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3441                                           0))
3442     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3443                                                           socket);   
3444   read_handle->read_io_timeout_task_id =
3445       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3446   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3447        GNUNET_i2s (&socket->other_peer), __func__);
3448   return read_handle;
3449 }
3450
3451
3452 /**
3453  * Cancel pending write operation.
3454  *
3455  * @param ioh handle to operation to cancel
3456  */
3457 void
3458 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3459 {
3460   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3461   unsigned int packet;
3462
3463   GNUNET_assert (NULL != socket->write_handle);
3464   GNUNET_assert (socket->write_handle == ioh);
3465
3466   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3467   {
3468     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3469     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3470   }
3471
3472   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3473   {
3474     if (NULL == ioh->messages[packet]) break;
3475     GNUNET_free (ioh->messages[packet]);
3476   }
3477       
3478   GNUNET_free (socket->write_handle);
3479   socket->write_handle = NULL;
3480 }
3481
3482
3483 /**
3484  * Cancel pending read operation.
3485  *
3486  * @param ioh handle to operation to cancel
3487  */
3488 void
3489 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3490 {
3491   struct GNUNET_STREAM_Socket *socket;
3492   
3493   socket = ioh->socket;
3494   GNUNET_assert (NULL != socket->read_handle);
3495   GNUNET_assert (ioh == socket->read_handle);
3496   /* Read io time task should be there; if it is already executed then this
3497   read handle is not valid; However upon scheduler shutdown the read io task
3498   may be executed before */
3499   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3500     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3501   /* reading task may be present; if so we have to stop it */
3502   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3503     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3504   GNUNET_free (ioh);
3505   socket->read_handle = NULL;
3506 }
3507
3508 /* end of stream_api.c */