coverity fixes
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * Retransmission timeout
179    */
180   struct GNUNET_TIME_Relative retransmit_timeout;
181
182   /**
183    * The Acknowledgement Bitmap
184    */
185   GNUNET_STREAM_AckBitmap ack_bitmap;
186
187   /**
188    * Time when the Acknowledgement was queued
189    */
190   struct GNUNET_TIME_Absolute ack_time_registered;
191
192   /**
193    * Queued Acknowledgement deadline
194    */
195   struct GNUNET_TIME_Relative ack_time_deadline;
196
197   /**
198    * The mesh handle
199    */
200   struct GNUNET_MESH_Handle *mesh;
201
202   /**
203    * The mesh tunnel handle
204    */
205   struct GNUNET_MESH_Tunnel *tunnel;
206
207   /**
208    * Stream open closure
209    */
210   void *open_cls;
211
212   /**
213    * Stream open callback
214    */
215   GNUNET_STREAM_OpenCallback open_cb;
216
217   /**
218    * The current transmit handle (if a pending transmit request exists)
219    */
220   struct GNUNET_MESH_TransmitHandle *transmit_handle;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for retransmission task after timeout
265    */
266   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
267
268   /**
269    * Task identifier for retransmission of control messages
270    */
271   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * The state of the protocol associated with this socket
280    */
281   enum State state;
282
283   /**
284    * The status of the socket
285    */
286   enum GNUNET_STREAM_Status status;
287
288   /**
289    * The number of previous timeouts; FIXME: currently not used
290    */
291   unsigned int retries;
292
293   /**
294    * The application port number (type: uint32_t)
295    */
296   GNUNET_MESH_ApplicationType app_port;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * The write sequence number to be set incase of testing
305    */
306   uint32_t testing_set_write_sequence_number_value;
307
308   /**
309    * Write sequence number. Set to random when sending HELLO(client) and
310    * HELLO_ACK(server) 
311    */
312   uint32_t write_sequence_number;
313
314   /**
315    * Read sequence number. This number's value is determined during handshake
316    */
317   uint32_t read_sequence_number;
318
319   /**
320    * The receiver buffer size
321    */
322   uint32_t receive_buffer_size;
323
324   /**
325    * The receiver buffer boundaries
326    */
327   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
328
329   /**
330    * receiver's available buffer after the last acknowledged packet
331    */
332   uint32_t receiver_window_available;
333
334   /**
335    * The offset pointer used during write operation
336    */
337   uint32_t write_offset;
338
339   /**
340    * The offset after which we are expecting data
341    */
342   uint32_t read_offset;
343
344   /**
345    * The offset upto which user has read from the received buffer
346    */
347   uint32_t copy_offset;
348
349   /**
350    * The maximum size of the data message payload this stream handle can send
351    */
352   uint16_t max_payload_size;
353 };
354
355
356 /**
357  * A socket for listening
358  */
359 struct GNUNET_STREAM_ListenSocket
360 {
361   /**
362    * The mesh handle
363    */
364   struct GNUNET_MESH_Handle *mesh;
365
366   /**
367    * Our configuration
368    */
369   struct GNUNET_CONFIGURATION_Handle *cfg;
370
371   /**
372    * Handle to the lock manager service
373    */
374   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
375
376   /**
377    * The active LockingRequest from lockmanager
378    */
379   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
380
381   /**
382    * Callback to call after acquring a lock and listening
383    */
384   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
385
386   /**
387    * The callback function which is called after successful opening socket
388    */
389   GNUNET_STREAM_ListenCallback listen_cb;
390
391   /**
392    * The call back closure
393    */
394   void *listen_cb_cls;
395
396   /**
397    * The service port
398    */
399   GNUNET_MESH_ApplicationType port;
400   
401   /**
402    * The id of the lockmanager timeout task
403    */
404   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
405
406   /**
407    * The retransmit timeout
408    */
409   struct GNUNET_TIME_Relative retransmit_timeout;
410   
411   /**
412    * Listen enabled?
413    */
414   int listening;
415
416   /**
417    * Whether testing mode is active or not
418    */
419   int testing_active;
420
421   /**
422    * The write sequence number to be set incase of testing
423    */
424   uint32_t testing_set_write_sequence_number_value;
425
426   /**
427    * The maximum size of the data message payload this stream handle can send
428    */
429   uint16_t max_payload_size;
430
431 };
432
433
434 /**
435  * The IO Write Handle
436  */
437 struct GNUNET_STREAM_IOWriteHandle
438 {
439   /**
440    * The socket to which this write handle is associated
441    */
442   struct GNUNET_STREAM_Socket *socket;
443
444   /**
445    * The packet_buffers associated with this Handle
446    */
447   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
448
449   /**
450    * The write continuation callback
451    */
452   GNUNET_STREAM_CompletionContinuation write_cont;
453
454   /**
455    * Write continuation closure
456    */
457   void *write_cont_cls;
458
459   /**
460    * The bitmap of this IOHandle; Corresponding bit for a message is set when
461    * it has been acknowledged by the receiver
462    */
463   GNUNET_STREAM_AckBitmap ack_bitmap;
464
465   /**
466    * Number of bytes in this write handle
467    */
468   size_t size;
469
470   /**
471    * Number of packets already transmitted from this IO handle. Retransmitted
472    * packets are not taken into account here. This is used to determine which
473    * packets account for retransmission and which packets occupy buffer space at
474    * the receiver.
475    */
476   unsigned int packets_sent;
477 };
478
479
480 /**
481  * The IO Read Handle
482  */
483 struct GNUNET_STREAM_IOReadHandle
484 {
485   /**
486    * The socket to which this read handle is associated
487    */
488   struct GNUNET_STREAM_Socket *socket;
489   
490   /**
491    * Callback for the read processor
492    */
493   GNUNET_STREAM_DataProcessor proc;
494
495   /**
496    * The closure pointer for the read processor callback
497    */
498   void *proc_cls;
499
500   /**
501    * Task identifier for the read io timeout task
502    */
503   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
504
505   /**
506    * Task scheduled to continue a read operation.
507    */
508   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
509 };
510
511
512 /**
513  * Handle for Shutdown
514  */
515 struct GNUNET_STREAM_ShutdownHandle
516 {
517   /**
518    * The socket associated with this shutdown handle
519    */
520   struct GNUNET_STREAM_Socket *socket;
521
522   /**
523    * Shutdown completion callback
524    */
525   GNUNET_STREAM_ShutdownCompletion completion_cb;
526
527   /**
528    * Closure for completion callback
529    */
530   void *completion_cls;
531
532   /**
533    * Close message retransmission task id
534    */
535   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
536
537   /**
538    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
539    */
540   int operation;  
541 };
542
543
544 /**
545  * Default value in seconds for various timeouts
546  */
547 static const unsigned int default_timeout = 10;
548
549 /**
550  * The domain name for locks we use here
551  */
552 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
553
554
555 /**
556  * Callback function for sending queued message
557  *
558  * @param cls closure the socket
559  * @param size number of bytes available in buf
560  * @param buf where the callee should write the message
561  * @return number of bytes written to buf
562  */
563 static size_t
564 send_message_notify (void *cls, size_t size, void *buf)
565 {
566   struct GNUNET_STREAM_Socket *socket = cls;
567   struct MessageQueue *head;
568   size_t ret;
569
570   socket->transmit_handle = NULL; /* Remove the transmit handle */
571   head = socket->queue_head;
572   if (NULL == head)
573     return 0; /* just to be safe */
574   if (0 == size)                /* request timed out */
575   {
576     socket->retries++;
577     LOG (GNUNET_ERROR_TYPE_DEBUG,
578          "%s: Message sending timed out. Retry %d \n",
579          GNUNET_i2s (&socket->other_peer),
580          socket->retries);
581     socket->transmit_handle = 
582       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
583                                          GNUNET_NO, /* Corking */
584                                          /* FIXME: exponential backoff */
585                                          socket->retransmit_timeout,
586                                          &socket->other_peer,
587                                          ntohs (head->message->header.size),
588                                          &send_message_notify,
589                                          socket);
590     return 0;
591   }
592   ret = ntohs (head->message->header.size);
593   GNUNET_assert (size >= ret);
594   memcpy (buf, head->message, ret);
595   if (NULL != head->finish_cb)
596   {
597     head->finish_cb (head->finish_cb_cls, socket);
598   }
599   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
600                                socket->queue_tail,
601                                head);
602   GNUNET_free (head->message);
603   GNUNET_free (head);
604   head = socket->queue_head;
605   if (NULL != head)    /* more pending messages to send */
606   {
607     socket->retries = 0;
608     socket->transmit_handle = 
609       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
610                                          GNUNET_NO, /* Corking */
611                                          /* FIXME: exponential backoff */
612                                          socket->retransmit_timeout,
613                                          &socket->other_peer,
614                                          ntohs (head->message->header.size),
615                                          &send_message_notify,
616                                          socket);
617   }
618   return ret;
619 }
620
621
622 /**
623  * Queues a message for sending using the mesh connection of a socket
624  *
625  * @param socket the socket whose mesh connection is used
626  * @param message the message to be sent
627  * @param finish_cb the callback to be called when the message is sent
628  * @param finish_cb_cls the closure for the callback
629  * @param urgent set to GNUNET_YES to add the message to the beginning of the
630  *          queue; GNUNET_NO to add at the tail
631  */
632 static void
633 queue_message (struct GNUNET_STREAM_Socket *socket,
634                struct GNUNET_STREAM_MessageHeader *message,
635                SendFinishCallback finish_cb,
636                void *finish_cb_cls,
637                int urgent)
638 {
639   struct MessageQueue *queue_entity;
640
641   GNUNET_assert 
642     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
643      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
644   LOG (GNUNET_ERROR_TYPE_DEBUG,
645        "%s: Queueing message of type %d and size %d\n",
646        GNUNET_i2s (&socket->other_peer),
647        ntohs (message->header.type),
648        ntohs (message->header.size));
649   GNUNET_assert (NULL != message);
650   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
651   queue_entity->message = message;
652   queue_entity->finish_cb = finish_cb;
653   queue_entity->finish_cb_cls = finish_cb_cls;
654   if (GNUNET_YES == urgent)
655   {
656     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
657                                  queue_entity);
658     if (NULL != socket->transmit_handle)
659     {
660       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
661       socket->transmit_handle = NULL;
662     }
663   }
664   else
665     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
666                                       socket->queue_tail,
667                                       queue_entity);
668   if (NULL == socket->transmit_handle)
669   {
670     socket->retries = 0;
671     socket->transmit_handle = 
672         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
673                                            GNUNET_NO, /* Corking */
674                                            socket->retransmit_timeout,
675                                            &socket->other_peer,
676                                            ntohs (message->header.size),
677                                            &send_message_notify,
678                                            socket);
679   }
680 }
681
682
683 /**
684  * Copies a message and queues it for sending using the mesh connection of
685  * given socket 
686  *
687  * @param socket the socket whose mesh connection is used
688  * @param message the message to be sent
689  * @param finish_cb the callback to be called when the message is sent
690  * @param finish_cb_cls the closure for the callback
691  */
692 static void
693 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
694                         const struct GNUNET_STREAM_MessageHeader *message,
695                         SendFinishCallback finish_cb,
696                         void *finish_cb_cls)
697 {
698   struct GNUNET_STREAM_MessageHeader *msg_copy;
699   uint16_t size;
700   
701   size = ntohs (message->header.size);
702   msg_copy = GNUNET_malloc (size);
703   memcpy (msg_copy, message, size);
704   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
705 }
706
707
708 /**
709  * Writes data using the given socket. The amount of data written is limited by
710  * the receiver_window_size
711  *
712  * @param socket the socket to use
713  */
714 static void 
715 write_data (struct GNUNET_STREAM_Socket *socket);
716
717
718 /**
719  * Task for retransmitting data messages if they aren't ACK before their ack
720  * deadline 
721  *
722  * @param cls the socket
723  * @param tc the Task context
724  */
725 static void
726 data_retransmission_task (void *cls,
727                           const struct GNUNET_SCHEDULER_TaskContext *tc)
728 {
729   struct GNUNET_STREAM_Socket *socket = cls;
730   
731   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
732   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
733     return;
734   LOG (GNUNET_ERROR_TYPE_DEBUG,
735        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
736   write_data (socket);
737 }
738
739
740 /**
741  * Task for sending ACK message
742  *
743  * @param cls the socket
744  * @param tc the Task context
745  */
746 static void
747 ack_task (void *cls,
748           const struct GNUNET_SCHEDULER_TaskContext *tc)
749 {
750   struct GNUNET_STREAM_Socket *socket = cls;
751   struct GNUNET_STREAM_AckMessage *ack_msg;
752
753   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
754   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
755     return;
756   /* Create the ACK Message */
757   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
758   ack_msg->header.header.size = htons (sizeof (struct 
759                                                GNUNET_STREAM_AckMessage));
760   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
761   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
762   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
763   ack_msg->receive_window_remaining = 
764     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
765   /* Queue up ACK for immediate sending */
766   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
767 }
768
769
770 /**
771  * Retransmission task for shutdown messages
772  *
773  * @param cls the shutdown handle
774  * @param tc the Task Context
775  */
776 static void
777 close_msg_retransmission_task (void *cls,
778                                const struct GNUNET_SCHEDULER_TaskContext *tc)
779 {
780   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
781   struct GNUNET_STREAM_MessageHeader *msg;
782   struct GNUNET_STREAM_Socket *socket;
783
784   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
785   GNUNET_assert (NULL != shutdown_handle);
786   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
787     return;
788   socket = shutdown_handle->socket;
789   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
790   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
791   switch (shutdown_handle->operation)
792   {
793   case SHUT_RDWR:
794     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
795     break;
796   case SHUT_RD:
797     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
798     break;
799   case SHUT_WR:
800     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
801     break;
802   default:
803     GNUNET_free (msg);
804     shutdown_handle->close_msg_retransmission_task_id = 
805       GNUNET_SCHEDULER_NO_TASK;
806     return;
807   }
808   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
809   shutdown_handle->close_msg_retransmission_task_id =
810     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
811                                   &close_msg_retransmission_task,
812                                   shutdown_handle);
813 }
814
815
816 /**
817  * Function to modify a bit in GNUNET_STREAM_AckBitmap
818  *
819  * @param bitmap the bitmap to modify
820  * @param bit the bit number to modify
821  * @param value GNUNET_YES to on, GNUNET_NO to off
822  */
823 static void
824 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
825                       unsigned int bit, 
826                       int value)
827 {
828   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
829   if (GNUNET_YES == value)
830     *bitmap |= (1LL << bit);
831   else
832     *bitmap &= ~(1LL << bit);
833 }
834
835
836 /**
837  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
838  *
839  * @param bitmap address of the bitmap that has to be checked
840  * @param bit the bit number to check
841  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
842  */
843 static uint8_t
844 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
845                       unsigned int bit)
846 {
847   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
848   return 0 != (*bitmap & (1LL << bit));
849 }
850
851
852 /**
853  * Writes data using the given socket. The amount of data written is limited by
854  * the receiver_window_size
855  *
856  * @param socket the socket to use
857  */
858 static void 
859 write_data (struct GNUNET_STREAM_Socket *socket)
860 {
861   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
862   unsigned int packet;
863   
864   for (packet=0; packet < io_handle->packets_sent; packet++)
865   {
866     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
867                                            packet))
868     {
869       LOG (GNUNET_ERROR_TYPE_DEBUG,
870            "%s: Retransmitting DATA message with sequence %u\n",
871            GNUNET_i2s (&socket->other_peer),
872            ntohl (io_handle->messages[packet]->sequence_number));
873       copy_and_queue_message (socket,
874                               &io_handle->messages[packet]->header,
875                               NULL,
876                               NULL);
877     }
878   }
879   /* Now send new packets if there is enough buffer space */
880   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
881          (NULL != io_handle->messages[packet]) &&
882          (socket->receiver_window_available 
883           >= ntohs (io_handle->messages[packet]->header.header.size)))
884   {
885     socket->receiver_window_available -= 
886       ntohs (io_handle->messages[packet]->header.header.size);
887     LOG (GNUNET_ERROR_TYPE_DEBUG,
888          "%s: Placing DATA message with sequence %u in send queue\n",
889          GNUNET_i2s (&socket->other_peer),
890          ntohl (io_handle->messages[packet]->sequence_number));
891     copy_and_queue_message (socket,
892                             &io_handle->messages[packet]->header,
893                             NULL,
894                             NULL);
895     packet++;
896   }
897   io_handle->packets_sent = packet;
898   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
899   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
900     socket->data_retransmission_task_id = 
901       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
902                                     (GNUNET_TIME_UNIT_SECONDS, 8),
903                                     &data_retransmission_task,
904                                     socket);
905 }
906
907
908 /**
909  * Task for calling the read processor
910  *
911  * @param cls the socket
912  * @param tc the task context
913  */
914 static void
915 call_read_processor (void *cls,
916                      const struct GNUNET_SCHEDULER_TaskContext *tc)
917 {
918   struct GNUNET_STREAM_Socket *socket = cls;
919   struct GNUNET_STREAM_IOReadHandle *read_handle;
920   size_t read_size;
921   size_t valid_read_size;
922   unsigned int packet;
923   uint32_t sequence_increase;
924   uint32_t offset_increase;
925
926   read_handle = socket->read_handle;
927   GNUNET_assert (NULL != read_handle);
928   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
929   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
930     return;
931   if (NULL == socket->receive_buffer) 
932     return;
933   GNUNET_assert (NULL != socket->read_handle);
934   GNUNET_assert (NULL != socket->read_handle->proc);
935   /* Check the bitmap for any holes */
936   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
937   {
938     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
939                                            packet))
940       break;
941   }
942   /* We only call read processor if we have the first packet */
943   GNUNET_assert (0 < packet);
944   valid_read_size = 
945     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
946   GNUNET_assert (0 != valid_read_size);
947   /* Cancel the read_io_timeout_task */
948   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
949   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
950   /* Call the data processor */
951   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
952        GNUNET_i2s (&socket->other_peer));
953   read_size = 
954       socket->read_handle->proc (socket->read_handle->proc_cls,
955                                  socket->status,
956                                  socket->receive_buffer + socket->copy_offset,
957                                  valid_read_size);
958   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
959        GNUNET_i2s (&socket->other_peer), read_size);
960   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
961        GNUNET_i2s (&socket->other_peer));
962   /* Free the read handle */
963   GNUNET_free (socket->read_handle);
964   socket->read_handle = NULL;
965   GNUNET_assert (read_size <= valid_read_size);
966   socket->copy_offset += read_size;
967   /* Determine upto which packet we can remove from the buffer */
968   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
969   {
970     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
971     { 
972       packet++; 
973       break;
974     }
975     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
976       break;
977   }
978   /* If no packets can be removed we can't move the buffer */
979   if (0 == packet) 
980     return;
981   sequence_increase = packet;
982   LOG (GNUNET_ERROR_TYPE_DEBUG,
983        "%s: Sequence increase after read processor completion: %u\n",
984        GNUNET_i2s (&socket->other_peer), sequence_increase);
985   /* Shift the data in the receive buffer */
986   socket->receive_buffer = 
987     memmove (socket->receive_buffer,
988              socket->receive_buffer 
989              + socket->receive_buffer_boundaries[sequence_increase-1],
990              socket->receive_buffer_size
991              - socket->receive_buffer_boundaries[sequence_increase-1]);
992   /* Shift the bitmap */
993   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
994   /* Set read_sequence_number */
995   socket->read_sequence_number += sequence_increase;
996   /* Set read_offset */
997   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
998   socket->read_offset += offset_increase;
999   /* Fix copy_offset */
1000   GNUNET_assert (offset_increase <= socket->copy_offset);
1001   socket->copy_offset -= offset_increase;
1002   /* Fix relative boundaries */
1003   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1004   {
1005     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1006     {
1007       uint32_t ahead_buffer_boundary;
1008
1009       ahead_buffer_boundary = 
1010         socket->receive_buffer_boundaries[packet + sequence_increase];
1011       if (0 == ahead_buffer_boundary)
1012         socket->receive_buffer_boundaries[packet] = 0;
1013       else
1014       {
1015         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1016         socket->receive_buffer_boundaries[packet] = 
1017           ahead_buffer_boundary - offset_increase;
1018       }
1019     }
1020     else
1021       socket->receive_buffer_boundaries[packet] = 0;
1022   }
1023 }
1024
1025
1026 /**
1027  * Cancels the existing read io handle
1028  *
1029  * @param cls the closure from the SCHEDULER call
1030  * @param tc the task context
1031  */
1032 static void
1033 read_io_timeout (void *cls,
1034                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1035 {
1036   struct GNUNET_STREAM_Socket *socket = cls;
1037   struct GNUNET_STREAM_IOReadHandle *read_handle;
1038   GNUNET_STREAM_DataProcessor proc;
1039   void *proc_cls;
1040
1041   read_handle = socket->read_handle;
1042   GNUNET_assert (NULL != read_handle);
1043   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1044   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1045     return;
1046   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1047   {
1048     LOG (GNUNET_ERROR_TYPE_DEBUG,
1049          "%s: Read task timedout - Cancelling it\n",
1050          GNUNET_i2s (&socket->other_peer));
1051     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1052     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1053   }
1054   proc = read_handle->proc;
1055   proc_cls = read_handle->proc_cls;
1056   GNUNET_free (read_handle);
1057   socket->read_handle = NULL;
1058   /* Call the read processor to signal timeout */
1059   proc (proc_cls,
1060         GNUNET_STREAM_TIMEOUT,
1061         NULL,
1062         0);
1063 }
1064
1065
1066 /**
1067  * Handler for DATA messages; Same for both client and server
1068  *
1069  * @param socket the socket through which the ack was received
1070  * @param tunnel connection to the other end
1071  * @param sender who sent the message
1072  * @param msg the data message
1073  * @param atsi performance data for the connection
1074  * @return GNUNET_OK to keep the connection open,
1075  *         GNUNET_SYSERR to close it (signal serious error)
1076  */
1077 static int
1078 handle_data (struct GNUNET_STREAM_Socket *socket,
1079              struct GNUNET_MESH_Tunnel *tunnel,
1080              const struct GNUNET_PeerIdentity *sender,
1081              const struct GNUNET_STREAM_DataMessage *msg,
1082              const struct GNUNET_ATS_Information*atsi)
1083 {
1084   const void *payload;
1085   struct GNUNET_TIME_Relative ack_deadline_rel;
1086   uint32_t bytes_needed;
1087   uint32_t relative_offset;
1088   uint32_t relative_sequence_number;
1089   uint16_t size;
1090
1091   size = htons (msg->header.header.size);
1092   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1093   {
1094     GNUNET_break_op (0);
1095     return GNUNET_SYSERR;
1096   }
1097   if (0 != memcmp (sender, &socket->other_peer,
1098                    sizeof (struct GNUNET_PeerIdentity)))
1099   {
1100     LOG (GNUNET_ERROR_TYPE_DEBUG,
1101          "%s: Received DATA from non-confirming peer\n",
1102          GNUNET_i2s (&socket->other_peer));
1103     return GNUNET_YES;
1104   }
1105   switch (socket->state)
1106   {
1107   case STATE_ESTABLISHED:
1108   case STATE_TRANSMIT_CLOSED:
1109   case STATE_TRANSMIT_CLOSE_WAIT:      
1110     /* check if the message's sequence number is in the range we are
1111        expecting */
1112     relative_sequence_number = 
1113       ntohl (msg->sequence_number) - socket->read_sequence_number;
1114     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1115     {
1116       LOG (GNUNET_ERROR_TYPE_DEBUG,
1117            "%s: Ignoring received message with sequence number %u\n",
1118            GNUNET_i2s (&socket->other_peer),
1119            ntohl (msg->sequence_number));
1120       /* Start ACK sending task if one is not already present */
1121       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1122       {
1123         socket->ack_task_id = 
1124           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1125                                         (msg->ack_deadline),
1126                                         &ack_task,
1127                                         socket);
1128       }
1129       return GNUNET_YES;
1130     }      
1131     /* Check if we have already seen this message */
1132     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1133                                             relative_sequence_number))
1134     {
1135       LOG (GNUNET_ERROR_TYPE_DEBUG,
1136            "%s: Ignoring already received message with sequence number %u\n",
1137            GNUNET_i2s (&socket->other_peer),
1138            ntohl (msg->sequence_number));
1139       /* Start ACK sending task if one is not already present */
1140       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1141       {
1142         socket->ack_task_id = 
1143           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1144                                         (msg->ack_deadline), &ack_task, socket);
1145       }
1146       return GNUNET_YES;
1147     }
1148     LOG (GNUNET_ERROR_TYPE_DEBUG,
1149          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1150          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1151          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1152     /* Check if we have to allocate the buffer */
1153     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1154     relative_offset = ntohl (msg->offset) - socket->read_offset;
1155     bytes_needed = relative_offset + size;
1156     if (bytes_needed > socket->receive_buffer_size)
1157     {
1158       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1159       {
1160         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1161                                                  bytes_needed);
1162         socket->receive_buffer_size = bytes_needed;
1163       }
1164       else
1165       {
1166         LOG (GNUNET_ERROR_TYPE_DEBUG,
1167              "%s: Cannot accommodate packet %d as buffer is full\n",
1168              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1169         return GNUNET_YES;
1170       }
1171     }
1172     /* Copy Data to buffer */
1173     payload = &msg[1];
1174     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1175     memcpy (socket->receive_buffer + relative_offset, payload, size);
1176     socket->receive_buffer_boundaries[relative_sequence_number] = 
1177         relative_offset + size;
1178     /* Modify the ACK bitmap */
1179     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1180                           GNUNET_YES);
1181     /* Start ACK sending task if one is not already present */
1182     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1183     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1184     {
1185       ack_deadline_rel = 
1186           GNUNET_TIME_relative_min (ack_deadline_rel,
1187                                     GNUNET_TIME_relative_multiply
1188                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1189       socket->ack_task_id = 
1190           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1191                                         (msg->ack_deadline), &ack_task, socket);
1192       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1193       socket->ack_time_deadline = ack_deadline_rel;
1194     }
1195     else
1196     {
1197       struct GNUNET_TIME_Relative ack_time_past;
1198       struct GNUNET_TIME_Relative ack_time_remaining;
1199       struct GNUNET_TIME_Relative ack_time_min;
1200       ack_time_past = 
1201           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1202       ack_time_remaining = GNUNET_TIME_relative_subtract
1203           (socket->ack_time_deadline, ack_time_past);
1204       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1205                                                ack_deadline_rel);
1206       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1207                       sizeof (struct GNUNET_TIME_Relative)))
1208       {
1209         ack_deadline_rel = ack_time_min;
1210         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1211         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1212                                                             &ack_task, socket);
1213         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1214         socket->ack_time_deadline = ack_deadline_rel;
1215       }
1216     }
1217     if ((NULL != socket->read_handle) /* A read handle is waiting */
1218         /* There is no current read task */
1219         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1220         /* We have the first packet */
1221         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1222     {
1223       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1224            GNUNET_i2s (&socket->other_peer));
1225       socket->read_handle->read_task_id =
1226           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1227     }
1228     break;
1229   default:
1230     LOG (GNUNET_ERROR_TYPE_DEBUG,
1231          "%s: Received data message when it cannot be handled\n",
1232          GNUNET_i2s (&socket->other_peer));
1233     break;
1234   }
1235   return GNUNET_YES;
1236 }
1237
1238
1239 /**
1240  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1241  *
1242  * @param cls the socket (set from GNUNET_MESH_connect)
1243  * @param tunnel connection to the other end
1244  * @param tunnel_ctx place to store local state associated with the tunnel
1245  * @param sender who sent the message
1246  * @param message the actual message
1247  * @param atsi performance data for the connection
1248  * @return GNUNET_OK to keep the connection open,
1249  *         GNUNET_SYSERR to close it (signal serious error)
1250  */
1251 static int
1252 client_handle_data (void *cls,
1253                     struct GNUNET_MESH_Tunnel *tunnel,
1254                     void **tunnel_ctx,
1255                     const struct GNUNET_PeerIdentity *sender,
1256                     const struct GNUNET_MessageHeader *message,
1257                     const struct GNUNET_ATS_Information*atsi)
1258 {
1259   struct GNUNET_STREAM_Socket *socket = cls;
1260
1261   return handle_data (socket, tunnel, sender, 
1262                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1263 }
1264
1265
1266 /**
1267  * Callback to set state to ESTABLISHED
1268  *
1269  * @param cls the closure NULL;
1270  * @param socket the socket to requiring state change
1271  */
1272 static void
1273 set_state_established (void *cls,
1274                        struct GNUNET_STREAM_Socket *socket)
1275 {
1276   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1277        "%s: Attaining ESTABLISHED state\n",
1278        GNUNET_i2s (&socket->other_peer));
1279   socket->write_offset = 0;
1280   socket->read_offset = 0;
1281   socket->state = STATE_ESTABLISHED;
1282   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1283                  socket->control_retransmission_task_id);
1284   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1285   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1286   if (NULL != socket->lsocket)
1287   {
1288     LOG (GNUNET_ERROR_TYPE_DEBUG,
1289          "%s: Calling listen callback\n",
1290          GNUNET_i2s (&socket->other_peer));
1291     if (GNUNET_SYSERR == 
1292         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1293                                     socket,
1294                                     &socket->other_peer))
1295     {
1296       socket->state = STATE_CLOSED;
1297       /* FIXME: We should close in a decent way (send RST) */
1298       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1299       GNUNET_free (socket);
1300     }
1301   }
1302   else
1303     socket->open_cb (socket->open_cls, socket);
1304 }
1305
1306
1307 /**
1308  * Callback to set state to HELLO_WAIT
1309  *
1310  * @param cls the closure from queue_message
1311  * @param socket the socket to requiring state change
1312  */
1313 static void
1314 set_state_hello_wait (void *cls,
1315                       struct GNUNET_STREAM_Socket *socket)
1316 {
1317   GNUNET_assert (STATE_INIT == socket->state);
1318   LOG (GNUNET_ERROR_TYPE_DEBUG,
1319        "%s: Attaining HELLO_WAIT state\n",
1320        GNUNET_i2s (&socket->other_peer));
1321   socket->state = STATE_HELLO_WAIT;
1322 }
1323
1324
1325 /**
1326  * Callback to set state to CLOSE_WAIT
1327  *
1328  * @param cls the closure from queue_message
1329  * @param socket the socket requiring state change
1330  */
1331 static void
1332 set_state_close_wait (void *cls,
1333                       struct GNUNET_STREAM_Socket *socket)
1334 {
1335   LOG (GNUNET_ERROR_TYPE_DEBUG,
1336        "%s: Attaing CLOSE_WAIT state\n",
1337        GNUNET_i2s (&socket->other_peer));
1338   socket->state = STATE_CLOSE_WAIT;
1339   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1340   socket->receive_buffer = NULL;
1341   socket->receive_buffer_size = 0;
1342 }
1343
1344
1345 /**
1346  * Callback to set state to RECEIVE_CLOSE_WAIT
1347  *
1348  * @param cls the closure from queue_message
1349  * @param socket the socket requiring state change
1350  */
1351 static void
1352 set_state_receive_close_wait (void *cls,
1353                               struct GNUNET_STREAM_Socket *socket)
1354 {
1355   LOG (GNUNET_ERROR_TYPE_DEBUG,
1356        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1357        GNUNET_i2s (&socket->other_peer));
1358   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1359   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1360   socket->receive_buffer = NULL;
1361   socket->receive_buffer_size = 0;
1362 }
1363
1364
1365 /**
1366  * Callback to set state to TRANSMIT_CLOSE_WAIT
1367  *
1368  * @param cls the closure from queue_message
1369  * @param socket the socket requiring state change
1370  */
1371 static void
1372 set_state_transmit_close_wait (void *cls,
1373                                struct GNUNET_STREAM_Socket *socket)
1374 {
1375   LOG (GNUNET_ERROR_TYPE_DEBUG,
1376        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1377        GNUNET_i2s (&socket->other_peer));
1378   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1379 }
1380
1381
1382 /**
1383  * Callback to set state to CLOSED
1384  *
1385  * @param cls the closure from queue_message
1386  * @param socket the socket requiring state change
1387  */
1388 static void
1389 set_state_closed (void *cls,
1390                   struct GNUNET_STREAM_Socket *socket)
1391 {
1392   socket->state = STATE_CLOSED;
1393 }
1394
1395
1396 /**
1397  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1398  *
1399  * @return the generate hello message
1400  */
1401 static struct GNUNET_STREAM_MessageHeader *
1402 generate_hello (void)
1403 {
1404   struct GNUNET_STREAM_MessageHeader *msg;
1405
1406   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1407   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1408   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1409   return msg;
1410 }
1411
1412
1413 /**
1414  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1415  * socket
1416  *
1417  * @param socket the socket for which this HelloAckMessage has to be generated
1418  * @param generate_seq GNUNET_YES to generate the write sequence number,
1419  *          GNUNET_NO to use the existing sequence number
1420  * @return the HelloAckMessage
1421  */
1422 static struct GNUNET_STREAM_HelloAckMessage *
1423 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1424                     int generate_seq)
1425 {
1426   struct GNUNET_STREAM_HelloAckMessage *msg;
1427
1428   if (GNUNET_YES == generate_seq)
1429   {
1430     if (GNUNET_YES == socket->testing_active)
1431       socket->write_sequence_number =
1432         socket->testing_set_write_sequence_number_value;
1433     else
1434       socket->write_sequence_number = 
1435         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1436     LOG_DEBUG ("%s: write sequence number %u\n",
1437                GNUNET_i2s (&socket->other_peer),
1438                (unsigned int) socket->write_sequence_number);
1439   }
1440   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1441   msg->header.header.size = 
1442     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1443   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1444   msg->sequence_number = htonl (socket->write_sequence_number);
1445   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1446   return msg;
1447 }
1448
1449
1450 /**
1451  * Task for retransmitting control messages if they aren't ACK'ed before a
1452  * deadline
1453  *
1454  * @param cls the socket
1455  * @param tc the Task context
1456  */
1457 static void
1458 control_retransmission_task (void *cls,
1459                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1460 {
1461   struct GNUNET_STREAM_Socket *socket = cls;
1462     
1463   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1464   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1465     return;
1466   LOG_DEBUG ("%s: Retransmitting a control message\n",
1467                  GNUNET_i2s (&socket->other_peer));
1468   switch (socket->state)
1469   {
1470   case STATE_INIT:    
1471     GNUNET_break (0);
1472     break;
1473   case STATE_LISTEN:
1474     GNUNET_break (0);
1475     break;
1476   case STATE_HELLO_WAIT:
1477     if (NULL == socket->lsocket) /* We are client */
1478       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1479     else
1480       queue_message (socket,
1481                      (struct GNUNET_STREAM_MessageHeader *)
1482                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1483                      GNUNET_NO);
1484     socket->control_retransmission_task_id =
1485     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1486                                   &control_retransmission_task, socket);
1487     break;
1488   case STATE_ESTABLISHED:
1489     if (NULL == socket->lsocket)
1490       queue_message (socket,
1491                      (struct GNUNET_STREAM_MessageHeader *)
1492                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1493                      GNUNET_NO);
1494     else
1495       GNUNET_break (0);
1496     break;
1497   default:
1498     GNUNET_break (0);
1499   }  
1500 }
1501
1502
1503 /**
1504  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1505  *
1506  * @param cls the socket (set from GNUNET_MESH_connect)
1507  * @param tunnel connection to the other end
1508  * @param tunnel_ctx this is NULL
1509  * @param sender who sent the message
1510  * @param message the actual message
1511  * @param atsi performance data for the connection
1512  * @return GNUNET_OK to keep the connection open,
1513  *         GNUNET_SYSERR to close it (signal serious error)
1514  */
1515 static int
1516 client_handle_hello_ack (void *cls,
1517                          struct GNUNET_MESH_Tunnel *tunnel,
1518                          void **tunnel_ctx,
1519                          const struct GNUNET_PeerIdentity *sender,
1520                          const struct GNUNET_MessageHeader *message,
1521                          const struct GNUNET_ATS_Information*atsi)
1522 {
1523   struct GNUNET_STREAM_Socket *socket = cls;
1524   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1525   struct GNUNET_STREAM_HelloAckMessage *reply;
1526
1527   if (0 != memcmp (sender, &socket->other_peer,
1528                    sizeof (struct GNUNET_PeerIdentity)))
1529   {
1530     LOG (GNUNET_ERROR_TYPE_DEBUG,
1531          "%s: Received HELLO_ACK from non-confirming peer\n",
1532          GNUNET_i2s (&socket->other_peer));
1533     return GNUNET_YES;
1534   }
1535   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1536   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1537        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1538   GNUNET_assert (socket->tunnel == tunnel);
1539   switch (socket->state)
1540   {
1541   case STATE_HELLO_WAIT:
1542     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1543     LOG (GNUNET_ERROR_TYPE_DEBUG,
1544          "%s: Read sequence number %u\n",
1545          GNUNET_i2s (&socket->other_peer),
1546          (unsigned int) socket->read_sequence_number);
1547     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1548     reply = generate_hello_ack (socket, GNUNET_YES);
1549     queue_message (socket, &reply->header, &set_state_established,
1550                    NULL, GNUNET_NO);    
1551     return GNUNET_OK;
1552   case STATE_ESTABLISHED:
1553     // call statistics (# ACKs ignored++)
1554     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1555                    socket->control_retransmission_task_id);
1556     socket->control_retransmission_task_id =
1557       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1558     return GNUNET_OK;
1559   default:
1560     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1561                GNUNET_i2s (&socket->other_peer),
1562                GNUNET_i2s (&socket->other_peer), socket->state);
1563     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1564     return GNUNET_SYSERR;
1565   }
1566 }
1567
1568
1569 /**
1570  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1571  *
1572  * @param cls the socket (set from GNUNET_MESH_connect)
1573  * @param tunnel connection to the other end
1574  * @param tunnel_ctx this is NULL
1575  * @param sender who sent the message
1576  * @param message the actual message
1577  * @param atsi performance data for the connection
1578  * @return GNUNET_OK to keep the connection open,
1579  *         GNUNET_SYSERR to close it (signal serious error)
1580  */
1581 static int
1582 client_handle_reset (void *cls,
1583                      struct GNUNET_MESH_Tunnel *tunnel,
1584                      void **tunnel_ctx,
1585                      const struct GNUNET_PeerIdentity *sender,
1586                      const struct GNUNET_MessageHeader *message,
1587                      const struct GNUNET_ATS_Information*atsi)
1588 {
1589   // struct GNUNET_STREAM_Socket *socket = cls;
1590
1591   return GNUNET_OK;
1592 }
1593
1594
1595 /**
1596  * Common message handler for handling TRANSMIT_CLOSE messages
1597  *
1598  * @param socket the socket through which the ack was received
1599  * @param tunnel connection to the other end
1600  * @param sender who sent the message
1601  * @param msg the transmit close message
1602  * @param atsi performance data for the connection
1603  * @return GNUNET_OK to keep the connection open,
1604  *         GNUNET_SYSERR to close it (signal serious error)
1605  */
1606 static int
1607 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1608                        struct GNUNET_MESH_Tunnel *tunnel,
1609                        const struct GNUNET_PeerIdentity *sender,
1610                        const struct GNUNET_STREAM_MessageHeader *msg,
1611                        const struct GNUNET_ATS_Information*atsi)
1612 {
1613   struct GNUNET_STREAM_MessageHeader *reply;
1614
1615   switch (socket->state)
1616   {
1617   case STATE_ESTABLISHED:
1618     socket->state = STATE_RECEIVE_CLOSED;
1619     /* Send TRANSMIT_CLOSE_ACK */
1620     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1621     reply->header.type = 
1622       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1623     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1624     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1625     break;
1626   default:
1627     /* FIXME: Call statistics? */
1628     break;
1629   }
1630   return GNUNET_YES;
1631 }
1632
1633
1634 /**
1635  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1636  *
1637  * @param cls the socket (set from GNUNET_MESH_connect)
1638  * @param tunnel connection to the other end
1639  * @param tunnel_ctx this is NULL
1640  * @param sender who sent the message
1641  * @param message the actual message
1642  * @param atsi performance data for the connection
1643  * @return GNUNET_OK to keep the connection open,
1644  *         GNUNET_SYSERR to close it (signal serious error)
1645  */
1646 static int
1647 client_handle_transmit_close (void *cls,
1648                               struct GNUNET_MESH_Tunnel *tunnel,
1649                               void **tunnel_ctx,
1650                               const struct GNUNET_PeerIdentity *sender,
1651                               const struct GNUNET_MessageHeader *message,
1652                               const struct GNUNET_ATS_Information*atsi)
1653 {
1654   struct GNUNET_STREAM_Socket *socket = cls;
1655   
1656   return handle_transmit_close (socket,
1657                                 tunnel,
1658                                 sender,
1659                                 (struct GNUNET_STREAM_MessageHeader *)message,
1660                                 atsi);
1661 }
1662
1663
1664 /**
1665  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1666  *
1667  * @param socket the socket
1668  * @param tunnel connection to the other end
1669  * @param sender who sent the message
1670  * @param message the actual message
1671  * @param atsi performance data for the connection
1672  * @param operation the close operation which is being ACK'ed
1673  * @return GNUNET_OK to keep the connection open,
1674  *         GNUNET_SYSERR to close it (signal serious error)
1675  */
1676 static int
1677 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1678                           struct GNUNET_MESH_Tunnel *tunnel,
1679                           const struct GNUNET_PeerIdentity *sender,
1680                           const struct GNUNET_STREAM_MessageHeader *message,
1681                           const struct GNUNET_ATS_Information *atsi,
1682                           int operation)
1683 {
1684   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1685
1686   shutdown_handle = socket->shutdown_handle;
1687   if (NULL == shutdown_handle)
1688   {
1689     LOG (GNUNET_ERROR_TYPE_DEBUG,
1690          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1691          GNUNET_i2s (&socket->other_peer));
1692     return GNUNET_OK;
1693   }
1694   switch (operation)
1695   {
1696   case SHUT_RDWR:
1697     switch (socket->state)
1698     {
1699     case STATE_CLOSE_WAIT:
1700       if (SHUT_RDWR != shutdown_handle->operation)
1701       {
1702         LOG (GNUNET_ERROR_TYPE_DEBUG,
1703              "%s: Received CLOSE_ACK when shutdown handle is not for "
1704              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1705         return GNUNET_OK;
1706       }
1707       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1708            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1709       socket->state = STATE_CLOSED;
1710       break;
1711     default:
1712       LOG (GNUNET_ERROR_TYPE_DEBUG,
1713            "%s: Received CLOSE_ACK when in it not expected\n",
1714            GNUNET_i2s (&socket->other_peer));
1715       return GNUNET_OK;
1716     }
1717     break;
1718   case SHUT_RD:
1719     switch (socket->state)
1720     {
1721     case STATE_RECEIVE_CLOSE_WAIT:
1722       if (SHUT_RD != shutdown_handle->operation)
1723       {
1724         LOG (GNUNET_ERROR_TYPE_DEBUG,
1725              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1726              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1727         return GNUNET_OK;
1728       }
1729       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1730            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1731       socket->state = STATE_RECEIVE_CLOSED;
1732       break;
1733     default:
1734       LOG (GNUNET_ERROR_TYPE_DEBUG,
1735            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1736            GNUNET_i2s (&socket->other_peer));
1737       return GNUNET_OK;
1738     }
1739     break;
1740   case SHUT_WR:
1741     switch (socket->state)
1742     {
1743     case STATE_TRANSMIT_CLOSE_WAIT:
1744       if (SHUT_WR != shutdown_handle->operation)
1745       {
1746         LOG (GNUNET_ERROR_TYPE_DEBUG,
1747              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1748              "is not for SHUT_WR\n",
1749              GNUNET_i2s (&socket->other_peer));
1750         return GNUNET_OK;
1751       }
1752       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1753            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1754       socket->state = STATE_TRANSMIT_CLOSED;
1755       break;
1756     default:
1757       LOG (GNUNET_ERROR_TYPE_DEBUG,
1758            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1759            GNUNET_i2s (&socket->other_peer));          
1760       return GNUNET_OK;
1761     }
1762     break;
1763   default:
1764     GNUNET_assert (0);
1765   }
1766   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1767     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1768                                    operation);
1769   if (GNUNET_SCHEDULER_NO_TASK
1770       != shutdown_handle->close_msg_retransmission_task_id)
1771   {
1772     GNUNET_SCHEDULER_cancel
1773       (shutdown_handle->close_msg_retransmission_task_id);
1774     shutdown_handle->close_msg_retransmission_task_id =
1775         GNUNET_SCHEDULER_NO_TASK;
1776   }
1777   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1778   socket->shutdown_handle = NULL;
1779   return GNUNET_OK;
1780 }
1781
1782
1783 /**
1784  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1785  *
1786  * @param cls the socket (set from GNUNET_MESH_connect)
1787  * @param tunnel connection to the other end
1788  * @param tunnel_ctx this is NULL
1789  * @param sender who sent the message
1790  * @param message the actual message
1791  * @param atsi performance data for the connection
1792  * @return GNUNET_OK to keep the connection open,
1793  *         GNUNET_SYSERR to close it (signal serious error)
1794  */
1795 static int
1796 client_handle_transmit_close_ack (void *cls,
1797                                   struct GNUNET_MESH_Tunnel *tunnel,
1798                                   void **tunnel_ctx,
1799                                   const struct GNUNET_PeerIdentity *sender,
1800                                   const struct GNUNET_MessageHeader *message,
1801                                   const struct GNUNET_ATS_Information*atsi)
1802 {
1803   struct GNUNET_STREAM_Socket *socket = cls;
1804
1805   return handle_generic_close_ack (socket,
1806                                    tunnel,
1807                                    sender,
1808                                    (const struct GNUNET_STREAM_MessageHeader *)
1809                                    message,
1810                                    atsi,
1811                                    SHUT_WR);
1812 }
1813
1814
1815 /**
1816  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1817  *
1818  * @param socket the socket
1819  * @param tunnel connection to the other end
1820  * @param sender who sent the message
1821  * @param message the actual message
1822  * @param atsi performance data for the connection
1823  * @return GNUNET_OK to keep the connection open,
1824  *         GNUNET_SYSERR to close it (signal serious error)
1825  */
1826 static int
1827 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1828                       struct GNUNET_MESH_Tunnel *tunnel,
1829                       const struct GNUNET_PeerIdentity *sender,
1830                       const struct GNUNET_STREAM_MessageHeader *message,
1831                       const struct GNUNET_ATS_Information *atsi)
1832 {
1833   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1834
1835   switch (socket->state)
1836   {
1837   case STATE_INIT:
1838   case STATE_LISTEN:
1839   case STATE_HELLO_WAIT:
1840     LOG (GNUNET_ERROR_TYPE_DEBUG,
1841          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1842          GNUNET_i2s (&socket->other_peer));
1843     return GNUNET_OK;
1844   default:
1845     break;
1846   }
1847   
1848   LOG (GNUNET_ERROR_TYPE_DEBUG,
1849        "%s: Received RECEIVE_CLOSE from %s\n",
1850        GNUNET_i2s (&socket->other_peer),
1851        GNUNET_i2s (&socket->other_peer));
1852   receive_close_ack =
1853     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1854   receive_close_ack->header.size =
1855     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1856   receive_close_ack->header.type =
1857     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1858   queue_message (socket, receive_close_ack, &set_state_closed,
1859                  NULL, GNUNET_NO);  
1860   /* FIXME: Handle the case where write handle is present; the write operation
1861      should be deemed as finised and the write continuation callback
1862      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1863   return GNUNET_OK;
1864 }
1865
1866
1867 /**
1868  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1869  *
1870  * @param cls the socket (set from GNUNET_MESH_connect)
1871  * @param tunnel connection to the other end
1872  * @param tunnel_ctx this is NULL
1873  * @param sender who sent the message
1874  * @param message the actual message
1875  * @param atsi performance data for the connection
1876  * @return GNUNET_OK to keep the connection open,
1877  *         GNUNET_SYSERR to close it (signal serious error)
1878  */
1879 static int
1880 client_handle_receive_close (void *cls,
1881                              struct GNUNET_MESH_Tunnel *tunnel,
1882                              void **tunnel_ctx,
1883                              const struct GNUNET_PeerIdentity *sender,
1884                              const struct GNUNET_MessageHeader *message,
1885                              const struct GNUNET_ATS_Information*atsi)
1886 {
1887   struct GNUNET_STREAM_Socket *socket = cls;
1888
1889   return
1890     handle_receive_close (socket,
1891                           tunnel,
1892                           sender,
1893                           (const struct GNUNET_STREAM_MessageHeader *) message,
1894                           atsi);
1895 }
1896
1897
1898 /**
1899  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1900  *
1901  * @param cls the socket (set from GNUNET_MESH_connect)
1902  * @param tunnel connection to the other end
1903  * @param tunnel_ctx this is NULL
1904  * @param sender who sent the message
1905  * @param message the actual message
1906  * @param atsi performance data for the connection
1907  * @return GNUNET_OK to keep the connection open,
1908  *         GNUNET_SYSERR to close it (signal serious error)
1909  */
1910 static int
1911 client_handle_receive_close_ack (void *cls,
1912                                  struct GNUNET_MESH_Tunnel *tunnel,
1913                                  void **tunnel_ctx,
1914                                  const struct GNUNET_PeerIdentity *sender,
1915                                  const struct GNUNET_MessageHeader *message,
1916                                  const struct GNUNET_ATS_Information*atsi)
1917 {
1918   struct GNUNET_STREAM_Socket *socket = cls;
1919
1920   return handle_generic_close_ack (socket,
1921                                    tunnel,
1922                                    sender,
1923                                    (const struct GNUNET_STREAM_MessageHeader *)
1924                                    message,
1925                                    atsi,
1926                                    SHUT_RD);
1927 }
1928
1929
1930 /**
1931  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1932  *
1933  * @param socket the socket
1934  * @param tunnel connection to the other end
1935  * @param sender who sent the message
1936  * @param message the actual message
1937  * @param atsi performance data for the connection
1938  * @return GNUNET_OK to keep the connection open,
1939  *         GNUNET_SYSERR to close it (signal serious error)
1940  */
1941 static int
1942 handle_close (struct GNUNET_STREAM_Socket *socket,
1943               struct GNUNET_MESH_Tunnel *tunnel,
1944               const struct GNUNET_PeerIdentity *sender,
1945               const struct GNUNET_STREAM_MessageHeader *message,
1946               const struct GNUNET_ATS_Information*atsi)
1947 {
1948   struct GNUNET_STREAM_MessageHeader *close_ack;
1949
1950   switch (socket->state)
1951   {
1952   case STATE_INIT:
1953   case STATE_LISTEN:
1954   case STATE_HELLO_WAIT:
1955     LOG (GNUNET_ERROR_TYPE_DEBUG,
1956          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1957          GNUNET_i2s (&socket->other_peer));
1958     return GNUNET_OK;
1959   default:
1960     break;
1961   }
1962
1963   LOG (GNUNET_ERROR_TYPE_DEBUG,
1964        "%s: Received CLOSE from %s\n",
1965        GNUNET_i2s (&socket->other_peer),
1966        GNUNET_i2s (&socket->other_peer));
1967   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1968   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1969   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1970   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1971   if (socket->state == STATE_CLOSED)
1972     return GNUNET_OK;
1973
1974   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1975   socket->receive_buffer = NULL;
1976   socket->receive_buffer_size = 0;
1977   return GNUNET_OK;
1978 }
1979
1980
1981 /**
1982  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1983  *
1984  * @param cls the socket (set from GNUNET_MESH_connect)
1985  * @param tunnel connection to the other end
1986  * @param tunnel_ctx this is NULL
1987  * @param sender who sent the message
1988  * @param message the actual message
1989  * @param atsi performance data for the connection
1990  * @return GNUNET_OK to keep the connection open,
1991  *         GNUNET_SYSERR to close it (signal serious error)
1992  */
1993 static int
1994 client_handle_close (void *cls,
1995                      struct GNUNET_MESH_Tunnel *tunnel,
1996                      void **tunnel_ctx,
1997                      const struct GNUNET_PeerIdentity *sender,
1998                      const struct GNUNET_MessageHeader *message,
1999                      const struct GNUNET_ATS_Information*atsi)
2000 {
2001   struct GNUNET_STREAM_Socket *socket = cls;
2002
2003   return handle_close (socket,
2004                        tunnel,
2005                        sender,
2006                        (const struct GNUNET_STREAM_MessageHeader *) message,
2007                        atsi);
2008 }
2009
2010
2011 /**
2012  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2013  *
2014  * @param cls the socket (set from GNUNET_MESH_connect)
2015  * @param tunnel connection to the other end
2016  * @param tunnel_ctx this is NULL
2017  * @param sender who sent the message
2018  * @param message the actual message
2019  * @param atsi performance data for the connection
2020  * @return GNUNET_OK to keep the connection open,
2021  *         GNUNET_SYSERR to close it (signal serious error)
2022  */
2023 static int
2024 client_handle_close_ack (void *cls,
2025                          struct GNUNET_MESH_Tunnel *tunnel,
2026                          void **tunnel_ctx,
2027                          const struct GNUNET_PeerIdentity *sender,
2028                          const struct GNUNET_MessageHeader *message,
2029                          const struct GNUNET_ATS_Information *atsi)
2030 {
2031   struct GNUNET_STREAM_Socket *socket = cls;
2032
2033   return handle_generic_close_ack (socket,
2034                                    tunnel,
2035                                    sender,
2036                                    (const struct GNUNET_STREAM_MessageHeader *) 
2037                                    message,
2038                                    atsi,
2039                                    SHUT_RDWR);
2040 }
2041
2042 /*****************************/
2043 /* Server's Message Handlers */
2044 /*****************************/
2045
2046 /**
2047  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2048  *
2049  * @param cls the closure
2050  * @param tunnel connection to the other end
2051  * @param tunnel_ctx the socket
2052  * @param sender who sent the message
2053  * @param message the actual message
2054  * @param atsi performance data for the connection
2055  * @return GNUNET_OK to keep the connection open,
2056  *         GNUNET_SYSERR to close it (signal serious error)
2057  */
2058 static int
2059 server_handle_data (void *cls,
2060                     struct GNUNET_MESH_Tunnel *tunnel,
2061                     void **tunnel_ctx,
2062                     const struct GNUNET_PeerIdentity *sender,
2063                     const struct GNUNET_MessageHeader *message,
2064                     const struct GNUNET_ATS_Information*atsi)
2065 {
2066   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2067
2068   return handle_data (socket,
2069                       tunnel,
2070                       sender,
2071                       (const struct GNUNET_STREAM_DataMessage *)message,
2072                       atsi);
2073 }
2074
2075
2076 /**
2077  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2078  *
2079  * @param cls the closure
2080  * @param tunnel connection to the other end
2081  * @param tunnel_ctx the socket
2082  * @param sender who sent the message
2083  * @param message the actual message
2084  * @param atsi performance data for the connection
2085  * @return GNUNET_OK to keep the connection open,
2086  *         GNUNET_SYSERR to close it (signal serious error)
2087  */
2088 static int
2089 server_handle_hello (void *cls,
2090                      struct GNUNET_MESH_Tunnel *tunnel,
2091                      void **tunnel_ctx,
2092                      const struct GNUNET_PeerIdentity *sender,
2093                      const struct GNUNET_MessageHeader *message,
2094                      const struct GNUNET_ATS_Information*atsi)
2095 {
2096   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2097   struct GNUNET_STREAM_HelloAckMessage *reply;
2098
2099   if (0 != memcmp (sender,
2100                    &socket->other_peer,
2101                    sizeof (struct GNUNET_PeerIdentity)))
2102   {
2103     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2104                GNUNET_i2s (&socket->other_peer));
2105     return GNUNET_YES;
2106   }
2107   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2108   GNUNET_assert (socket->tunnel == tunnel);
2109   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2110              GNUNET_i2s (&socket->other_peer));
2111   switch (socket->state)
2112   {
2113   case STATE_INIT:
2114     reply = generate_hello_ack (socket, GNUNET_YES);
2115     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2116                    GNUNET_NO);
2117     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2118                    socket->control_retransmission_task_id);
2119     socket->control_retransmission_task_id =
2120       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2121                                     &control_retransmission_task, socket);
2122     break;
2123   case STATE_HELLO_WAIT:
2124     /* Perhaps our HELLO_ACK was lost */
2125     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2126                    socket->control_retransmission_task_id);
2127     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2128     socket->control_retransmission_task_id =
2129       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2130     break;
2131   default:
2132     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2133                GNUNET_i2s (&socket->other_peer), socket->state);
2134     /* FIXME: Send RESET? */
2135   }
2136   return GNUNET_OK;
2137 }
2138
2139
2140 /**
2141  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2142  *
2143  * @param cls the closure
2144  * @param tunnel connection to the other end
2145  * @param tunnel_ctx the socket
2146  * @param sender who sent the message
2147  * @param message the actual message
2148  * @param atsi performance data for the connection
2149  * @return GNUNET_OK to keep the connection open,
2150  *         GNUNET_SYSERR to close it (signal serious error)
2151  */
2152 static int
2153 server_handle_hello_ack (void *cls,
2154                          struct GNUNET_MESH_Tunnel *tunnel,
2155                          void **tunnel_ctx,
2156                          const struct GNUNET_PeerIdentity *sender,
2157                          const struct GNUNET_MessageHeader *message,
2158                          const struct GNUNET_ATS_Information*atsi)
2159 {
2160   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2161   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2162
2163   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2164                  ntohs (message->type));
2165   GNUNET_assert (socket->tunnel == tunnel);
2166   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2167   switch (socket->state)  
2168   {
2169   case STATE_HELLO_WAIT:
2170     LOG (GNUNET_ERROR_TYPE_DEBUG,
2171          "%s: Received HELLO_ACK from %s\n",
2172          GNUNET_i2s (&socket->other_peer),
2173          GNUNET_i2s (&socket->other_peer));
2174     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2175     LOG (GNUNET_ERROR_TYPE_DEBUG,
2176          "%s: Read sequence number %u\n",
2177          GNUNET_i2s (&socket->other_peer),
2178          (unsigned int) socket->read_sequence_number);
2179     socket->receiver_window_available = 
2180       ntohl (ack_message->receiver_window_size);
2181     set_state_established (NULL, socket);
2182     break;
2183   default:
2184     LOG (GNUNET_ERROR_TYPE_DEBUG,
2185          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2186   }
2187   return GNUNET_OK;
2188 }
2189
2190
2191 /**
2192  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2193  *
2194  * @param cls the closure
2195  * @param tunnel connection to the other end
2196  * @param tunnel_ctx the socket
2197  * @param sender who sent the message
2198  * @param message the actual message
2199  * @param atsi performance data for the connection
2200  * @return GNUNET_OK to keep the connection open,
2201  *         GNUNET_SYSERR to close it (signal serious error)
2202  */
2203 static int
2204 server_handle_reset (void *cls,
2205                      struct GNUNET_MESH_Tunnel *tunnel,
2206                      void **tunnel_ctx,
2207                      const struct GNUNET_PeerIdentity *sender,
2208                      const struct GNUNET_MessageHeader *message,
2209                      const struct GNUNET_ATS_Information*atsi)
2210 {
2211   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2212
2213   return GNUNET_OK;
2214 }
2215
2216
2217 /**
2218  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2219  *
2220  * @param cls the closure
2221  * @param tunnel connection to the other end
2222  * @param tunnel_ctx the socket
2223  * @param sender who sent the message
2224  * @param message the actual message
2225  * @param atsi performance data for the connection
2226  * @return GNUNET_OK to keep the connection open,
2227  *         GNUNET_SYSERR to close it (signal serious error)
2228  */
2229 static int
2230 server_handle_transmit_close (void *cls,
2231                               struct GNUNET_MESH_Tunnel *tunnel,
2232                               void **tunnel_ctx,
2233                               const struct GNUNET_PeerIdentity *sender,
2234                               const struct GNUNET_MessageHeader *message,
2235                               const struct GNUNET_ATS_Information*atsi)
2236 {
2237   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2238
2239   return handle_transmit_close (socket,
2240                                 tunnel,
2241                                 sender,
2242                                 (struct GNUNET_STREAM_MessageHeader *)message,
2243                                 atsi);
2244 }
2245
2246
2247 /**
2248  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2249  *
2250  * @param cls the closure
2251  * @param tunnel connection to the other end
2252  * @param tunnel_ctx the socket
2253  * @param sender who sent the message
2254  * @param message the actual message
2255  * @param atsi performance data for the connection
2256  * @return GNUNET_OK to keep the connection open,
2257  *         GNUNET_SYSERR to close it (signal serious error)
2258  */
2259 static int
2260 server_handle_transmit_close_ack (void *cls,
2261                                   struct GNUNET_MESH_Tunnel *tunnel,
2262                                   void **tunnel_ctx,
2263                                   const struct GNUNET_PeerIdentity *sender,
2264                                   const struct GNUNET_MessageHeader *message,
2265                                   const struct GNUNET_ATS_Information*atsi)
2266 {
2267   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2268
2269   return handle_generic_close_ack (socket,
2270                                    tunnel,
2271                                    sender,
2272                                    (const struct GNUNET_STREAM_MessageHeader *)
2273                                    message,
2274                                    atsi,
2275                                    SHUT_WR);
2276 }
2277
2278
2279 /**
2280  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2281  *
2282  * @param cls the closure
2283  * @param tunnel connection to the other end
2284  * @param tunnel_ctx the socket
2285  * @param sender who sent the message
2286  * @param message the actual message
2287  * @param atsi performance data for the connection
2288  * @return GNUNET_OK to keep the connection open,
2289  *         GNUNET_SYSERR to close it (signal serious error)
2290  */
2291 static int
2292 server_handle_receive_close (void *cls,
2293                              struct GNUNET_MESH_Tunnel *tunnel,
2294                              void **tunnel_ctx,
2295                              const struct GNUNET_PeerIdentity *sender,
2296                              const struct GNUNET_MessageHeader *message,
2297                              const struct GNUNET_ATS_Information*atsi)
2298 {
2299   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2300
2301   return
2302     handle_receive_close (socket,
2303                           tunnel,
2304                           sender,
2305                           (const struct GNUNET_STREAM_MessageHeader *) message,
2306                           atsi);
2307 }
2308
2309
2310 /**
2311  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2312  *
2313  * @param cls the closure
2314  * @param tunnel connection to the other end
2315  * @param tunnel_ctx the socket
2316  * @param sender who sent the message
2317  * @param message the actual message
2318  * @param atsi performance data for the connection
2319  * @return GNUNET_OK to keep the connection open,
2320  *         GNUNET_SYSERR to close it (signal serious error)
2321  */
2322 static int
2323 server_handle_receive_close_ack (void *cls,
2324                                  struct GNUNET_MESH_Tunnel *tunnel,
2325                                  void **tunnel_ctx,
2326                                  const struct GNUNET_PeerIdentity *sender,
2327                                  const struct GNUNET_MessageHeader *message,
2328                                  const struct GNUNET_ATS_Information*atsi)
2329 {
2330   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2331
2332   return handle_generic_close_ack (socket,
2333                                    tunnel,
2334                                    sender,
2335                                    (const struct GNUNET_STREAM_MessageHeader *)
2336                                    message,
2337                                    atsi,
2338                                    SHUT_RD);
2339 }
2340
2341
2342 /**
2343  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2344  *
2345  * @param cls the listen socket (from GNUNET_MESH_connect in
2346  *          GNUNET_STREAM_listen) 
2347  * @param tunnel connection to the other end
2348  * @param tunnel_ctx the socket
2349  * @param sender who sent the message
2350  * @param message the actual message
2351  * @param atsi performance data for the connection
2352  * @return GNUNET_OK to keep the connection open,
2353  *         GNUNET_SYSERR to close it (signal serious error)
2354  */
2355 static int
2356 server_handle_close (void *cls,
2357                      struct GNUNET_MESH_Tunnel *tunnel,
2358                      void **tunnel_ctx,
2359                      const struct GNUNET_PeerIdentity *sender,
2360                      const struct GNUNET_MessageHeader *message,
2361                      const struct GNUNET_ATS_Information*atsi)
2362 {
2363   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2364   
2365   return handle_close (socket,
2366                        tunnel,
2367                        sender,
2368                        (const struct GNUNET_STREAM_MessageHeader *) message,
2369                        atsi);
2370 }
2371
2372
2373 /**
2374  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2375  *
2376  * @param cls the closure
2377  * @param tunnel connection to the other end
2378  * @param tunnel_ctx the socket
2379  * @param sender who sent the message
2380  * @param message the actual message
2381  * @param atsi performance data for the connection
2382  * @return GNUNET_OK to keep the connection open,
2383  *         GNUNET_SYSERR to close it (signal serious error)
2384  */
2385 static int
2386 server_handle_close_ack (void *cls,
2387                          struct GNUNET_MESH_Tunnel *tunnel,
2388                          void **tunnel_ctx,
2389                          const struct GNUNET_PeerIdentity *sender,
2390                          const struct GNUNET_MessageHeader *message,
2391                          const struct GNUNET_ATS_Information*atsi)
2392 {
2393   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2394
2395   return handle_generic_close_ack (socket,
2396                                    tunnel,
2397                                    sender,
2398                                    (const struct GNUNET_STREAM_MessageHeader *) 
2399                                    message,
2400                                    atsi,
2401                                    SHUT_RDWR);
2402 }
2403
2404
2405 /**
2406  * Handler for DATA_ACK messages
2407  *
2408  * @param socket the socket through which the ack was received
2409  * @param tunnel connection to the other end
2410  * @param sender who sent the message
2411  * @param ack the acknowledgment message
2412  * @param atsi performance data for the connection
2413  * @return GNUNET_OK to keep the connection open,
2414  *         GNUNET_SYSERR to close it (signal serious error)
2415  */
2416 static int
2417 handle_ack (struct GNUNET_STREAM_Socket *socket,
2418             struct GNUNET_MESH_Tunnel *tunnel,
2419             const struct GNUNET_PeerIdentity *sender,
2420             const struct GNUNET_STREAM_AckMessage *ack,
2421             const struct GNUNET_ATS_Information*atsi)
2422 {
2423   unsigned int packet;
2424   int need_retransmission;
2425   uint32_t sequence_difference;
2426   
2427   if (0 != memcmp (sender,
2428                    &socket->other_peer,
2429                    sizeof (struct GNUNET_PeerIdentity)))
2430   {
2431     LOG (GNUNET_ERROR_TYPE_DEBUG,
2432          "%s: Received ACK from non-confirming peer\n",
2433          GNUNET_i2s (&socket->other_peer));
2434     return GNUNET_YES;
2435   }
2436   switch (socket->state)
2437   {
2438   case (STATE_ESTABLISHED):
2439   case (STATE_RECEIVE_CLOSED):
2440   case (STATE_RECEIVE_CLOSE_WAIT):
2441     if (NULL == socket->write_handle)
2442     {
2443       LOG (GNUNET_ERROR_TYPE_DEBUG,
2444            "%s: Received DATA_ACK when write_handle is NULL\n",
2445            GNUNET_i2s (&socket->other_peer));
2446       return GNUNET_OK;
2447     }
2448     sequence_difference = 
2449         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2450     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2451     {
2452       LOG (GNUNET_ERROR_TYPE_DEBUG,
2453            "%s: Received DATA_ACK with unexpected base sequence number\n",
2454            GNUNET_i2s (&socket->other_peer));
2455       LOG (GNUNET_ERROR_TYPE_DEBUG,
2456            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2457            GNUNET_i2s (&socket->other_peer),
2458            socket->write_sequence_number,
2459            ntohl (ack->base_sequence_number));
2460       return GNUNET_OK;
2461     }
2462     /* FIXME: include the case when write_handle is cancelled - ignore the 
2463        acks */
2464     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2465          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2466     /* Cancel the retransmission task */
2467     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2468     {
2469       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2470       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2471     }
2472     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2473     {
2474       if (NULL == socket->write_handle->messages[packet]) break;
2475       /* BS: Base sequence from ack; PS: sequence num of current packet */
2476       sequence_difference = ntohl (ack->base_sequence_number)
2477         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2478       if ((0 == sequence_difference) ||
2479           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2480         continue; /* The message in our handle is not yet received */
2481       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2482       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2483       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2484                             packet, GNUNET_YES);
2485     }
2486     /* Update the receive window remaining
2487        FIXME : Should update with the value from a data ack with greater
2488        sequence number */
2489     socket->receiver_window_available = 
2490       ntohl (ack->receive_window_remaining);
2491     /* Check if we have received all acknowledgements */
2492     need_retransmission = GNUNET_NO;
2493     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2494     {
2495       if (NULL == socket->write_handle->messages[packet]) break;
2496       if (GNUNET_YES != ackbitmap_is_bit_set 
2497           (&socket->write_handle->ack_bitmap,packet))
2498       {
2499         need_retransmission = GNUNET_YES;
2500         break;
2501       }
2502     }
2503     if (GNUNET_YES == need_retransmission)
2504     {
2505       write_data (socket);
2506     }
2507     else      /* We have to call the write continuation callback now */
2508     {
2509       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2510       
2511       /* Free the packets */
2512       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2513       {
2514         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2515       }
2516       write_handle = socket->write_handle;
2517       socket->write_handle = NULL;
2518       if (NULL != write_handle->write_cont)
2519         write_handle->write_cont (write_handle->write_cont_cls,
2520                                   socket->status,
2521                                   write_handle->size);
2522       /* We are done with the write handle - Freeing it */
2523       GNUNET_free (write_handle);
2524       LOG (GNUNET_ERROR_TYPE_DEBUG,
2525            "%s: Write completion callback completed\n",
2526            GNUNET_i2s (&socket->other_peer));      
2527     }
2528     break;
2529   default:
2530     break;
2531   }
2532   return GNUNET_OK;
2533 }
2534
2535
2536 /**
2537  * Handler for DATA_ACK messages
2538  *
2539  * @param cls the 'struct GNUNET_STREAM_Socket'
2540  * @param tunnel connection to the other end
2541  * @param tunnel_ctx unused
2542  * @param sender who sent the message
2543  * @param message the actual message
2544  * @param atsi performance data for the connection
2545  * @return GNUNET_OK to keep the connection open,
2546  *         GNUNET_SYSERR to close it (signal serious error)
2547  */
2548 static int
2549 client_handle_ack (void *cls,
2550                    struct GNUNET_MESH_Tunnel *tunnel,
2551                    void **tunnel_ctx,
2552                    const struct GNUNET_PeerIdentity *sender,
2553                    const struct GNUNET_MessageHeader *message,
2554                    const struct GNUNET_ATS_Information*atsi)
2555 {
2556   struct GNUNET_STREAM_Socket *socket = cls;
2557   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2558  
2559   return handle_ack (socket, tunnel, sender, ack, atsi);
2560 }
2561
2562
2563 /**
2564  * Handler for DATA_ACK messages
2565  *
2566  * @param cls the server's listen socket
2567  * @param tunnel connection to the other end
2568  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2569  * @param sender who sent the message
2570  * @param message the actual message
2571  * @param atsi performance data for the connection
2572  * @return GNUNET_OK to keep the connection open,
2573  *         GNUNET_SYSERR to close it (signal serious error)
2574  */
2575 static int
2576 server_handle_ack (void *cls,
2577                    struct GNUNET_MESH_Tunnel *tunnel,
2578                    void **tunnel_ctx,
2579                    const struct GNUNET_PeerIdentity *sender,
2580                    const struct GNUNET_MessageHeader *message,
2581                    const struct GNUNET_ATS_Information*atsi)
2582 {
2583   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2584   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2585  
2586   return handle_ack (socket, tunnel, sender, ack, atsi);
2587 }
2588
2589
2590 /**
2591  * For client message handlers, the stream socket is in the
2592  * closure argument.
2593  */
2594 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2595   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2596   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2597    sizeof (struct GNUNET_STREAM_AckMessage) },
2598   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2599    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2600   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2601    sizeof (struct GNUNET_STREAM_MessageHeader)},
2602   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2603    sizeof (struct GNUNET_STREAM_MessageHeader)},
2604   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2605    sizeof (struct GNUNET_STREAM_MessageHeader)},
2606   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2607    sizeof (struct GNUNET_STREAM_MessageHeader)},
2608   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2609    sizeof (struct GNUNET_STREAM_MessageHeader)},
2610   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2611    sizeof (struct GNUNET_STREAM_MessageHeader)},
2612   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2613    sizeof (struct GNUNET_STREAM_MessageHeader)},
2614   {NULL, 0, 0}
2615 };
2616
2617
2618 /**
2619  * For server message handlers, the stream socket is in the
2620  * tunnel context, and the listen socket in the closure argument.
2621  */
2622 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2623   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2624   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2625    sizeof (struct GNUNET_STREAM_AckMessage) },
2626   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2627    sizeof (struct GNUNET_STREAM_MessageHeader)},
2628   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2629    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2630   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2631    sizeof (struct GNUNET_STREAM_MessageHeader)},
2632   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2633    sizeof (struct GNUNET_STREAM_MessageHeader)},
2634   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2635    sizeof (struct GNUNET_STREAM_MessageHeader)},
2636   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2637    sizeof (struct GNUNET_STREAM_MessageHeader)},
2638   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2639    sizeof (struct GNUNET_STREAM_MessageHeader)},
2640   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2641    sizeof (struct GNUNET_STREAM_MessageHeader)},
2642   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2643    sizeof (struct GNUNET_STREAM_MessageHeader)},
2644   {NULL, 0, 0}
2645 };
2646
2647
2648 /**
2649  * Function called when our target peer is connected to our tunnel
2650  *
2651  * @param cls the socket for which this tunnel is created
2652  * @param peer the peer identity of the target
2653  * @param atsi performance data for the connection
2654  */
2655 static void
2656 mesh_peer_connect_callback (void *cls,
2657                             const struct GNUNET_PeerIdentity *peer,
2658                             const struct GNUNET_ATS_Information * atsi)
2659 {
2660   struct GNUNET_STREAM_Socket *socket = cls;
2661   struct GNUNET_STREAM_MessageHeader *message;
2662   
2663   if (0 != memcmp (peer,
2664                    &socket->other_peer,
2665                    sizeof (struct GNUNET_PeerIdentity)))
2666   {
2667     LOG (GNUNET_ERROR_TYPE_DEBUG,
2668          "%s: A peer which is not our target has connected to our tunnel\n",
2669          GNUNET_i2s(peer));
2670     return;
2671   }
2672   LOG (GNUNET_ERROR_TYPE_DEBUG,
2673        "%s: Target peer %s connected\n",
2674        GNUNET_i2s (&socket->other_peer),
2675        GNUNET_i2s (&socket->other_peer));
2676   /* Set state to INIT */
2677   socket->state = STATE_INIT;
2678   /* Send HELLO message */
2679   message = generate_hello ();
2680   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2681   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2682                  socket->control_retransmission_task_id);
2683   socket->control_retransmission_task_id =
2684     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2685                                   &control_retransmission_task, socket);
2686 }
2687
2688
2689 /**
2690  * Function called when our target peer is disconnected from our tunnel
2691  *
2692  * @param cls the socket associated which this tunnel
2693  * @param peer the peer identity of the target
2694  */
2695 static void
2696 mesh_peer_disconnect_callback (void *cls,
2697                                const struct GNUNET_PeerIdentity *peer)
2698 {
2699   struct GNUNET_STREAM_Socket *socket=cls;
2700   
2701   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2702   LOG (GNUNET_ERROR_TYPE_DEBUG,
2703        "%s: Other peer %s disconnected \n",
2704        GNUNET_i2s (&socket->other_peer),
2705        GNUNET_i2s (&socket->other_peer));
2706 }
2707
2708
2709 /**
2710  * Method called whenever a peer creates a tunnel to us
2711  *
2712  * @param cls closure
2713  * @param tunnel new handle to the tunnel
2714  * @param initiator peer that started the tunnel
2715  * @param atsi performance information for the tunnel
2716  * @return initial tunnel context for the tunnel
2717  *         (can be NULL -- that's not an error)
2718  */
2719 static void *
2720 new_tunnel_notify (void *cls,
2721                    struct GNUNET_MESH_Tunnel *tunnel,
2722                    const struct GNUNET_PeerIdentity *initiator,
2723                    const struct GNUNET_ATS_Information *atsi)
2724 {
2725   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2726   struct GNUNET_STREAM_Socket *socket;
2727
2728   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2729      from the same peer again until the socket is closed */
2730
2731   if (GNUNET_NO == lsocket->listening)
2732   {
2733     GNUNET_MESH_tunnel_destroy (tunnel);
2734     return NULL;
2735   }
2736   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2737   socket->other_peer = *initiator;
2738   socket->tunnel = tunnel;
2739   socket->state = STATE_INIT;
2740   socket->lsocket = lsocket;
2741   socket->retransmit_timeout = lsocket->retransmit_timeout;
2742   socket->testing_active = lsocket->testing_active;
2743   socket->testing_set_write_sequence_number_value =
2744       lsocket->testing_set_write_sequence_number_value;
2745   socket->max_payload_size = lsocket->max_payload_size;
2746   LOG (GNUNET_ERROR_TYPE_DEBUG,
2747        "%s: Peer %s initiated tunnel to us\n", 
2748        GNUNET_i2s (&socket->other_peer),
2749        GNUNET_i2s (&socket->other_peer));
2750   return socket;
2751 }
2752
2753
2754 /**
2755  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2756  * any associated state.  This function is NOT called if the client has
2757  * explicitly asked for the tunnel to be destroyed using
2758  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2759  * the tunnel.
2760  *
2761  * @param cls closure (set from GNUNET_MESH_connect)
2762  * @param tunnel connection to the other end (henceforth invalid)
2763  * @param tunnel_ctx place where local state associated
2764  *                   with the tunnel is stored
2765  */
2766 static void 
2767 tunnel_cleaner (void *cls,
2768                 const struct GNUNET_MESH_Tunnel *tunnel,
2769                 void *tunnel_ctx)
2770 {
2771   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2772   struct MessageQueue *head;
2773
2774   GNUNET_assert (tunnel == socket->tunnel);
2775   GNUNET_break_op(0);
2776   LOG (GNUNET_ERROR_TYPE_DEBUG,
2777        "%s: Peer %s has terminated connection abruptly\n",
2778        GNUNET_i2s (&socket->other_peer),
2779        GNUNET_i2s (&socket->other_peer));
2780   socket->status = GNUNET_STREAM_SHUTDOWN;
2781   /* Clear Transmit handles */
2782   if (NULL != socket->transmit_handle)
2783   {
2784     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2785     socket->transmit_handle = NULL;
2786   }
2787   /* Stop Tasks using socket->tunnel */
2788   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2789   {
2790     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2791     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2792   }
2793   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2794   {
2795     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2796     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2797   }  
2798   /* Terminate the control retransmission tasks */
2799   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2800   {
2801     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2802     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2803   }
2804   /* Clear Transmit handles */
2805   if (NULL != socket->transmit_handle)
2806   {
2807     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2808     socket->transmit_handle = NULL;
2809   }
2810   /* Clear existing message queue */
2811   while (NULL != (head = socket->queue_head)) {
2812     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2813                                  socket->queue_tail,
2814                                  head);
2815     GNUNET_free (head->message);
2816     GNUNET_free (head);
2817   }
2818   socket->tunnel = NULL;
2819 }
2820
2821
2822 /**
2823  * Callback to signal timeout on lockmanager lock acquire
2824  *
2825  * @param cls the ListenSocket
2826  * @param tc the scheduler task context
2827  */
2828 static void
2829 lockmanager_acquire_timeout (void *cls, 
2830                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2831 {
2832   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2833   GNUNET_STREAM_ListenCallback listen_cb;
2834   void *listen_cb_cls;
2835
2836   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2837   listen_cb = lsocket->listen_cb;
2838   listen_cb_cls = lsocket->listen_cb_cls;
2839   if (NULL != listen_cb)
2840     listen_cb (listen_cb_cls, NULL, NULL);
2841 }
2842
2843
2844 /**
2845  * Callback to notify us on the status changes on app_port lock
2846  *
2847  * @param cls the ListenSocket
2848  * @param domain the domain name of the lock
2849  * @param lock the app_port
2850  * @param status the current status of the lock
2851  */
2852 static void
2853 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2854                        enum GNUNET_LOCKMANAGER_Status status)
2855 {
2856   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2857
2858   GNUNET_assert (lock == (uint32_t) lsocket->port);
2859   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2860   {
2861     lsocket->listening = GNUNET_YES;
2862     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2863     {
2864       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2865       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2866     }
2867     if (NULL == lsocket->mesh)
2868     {
2869       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2870
2871       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2872                                            lsocket, /* Closure */
2873                                            &new_tunnel_notify,
2874                                            &tunnel_cleaner,
2875                                            server_message_handlers,
2876                                            ports);
2877       GNUNET_assert (NULL != lsocket->mesh);
2878       if (NULL != lsocket->listen_ok_cb)
2879       {
2880         (void) lsocket->listen_ok_cb ();
2881       }
2882     }
2883   }
2884   if (GNUNET_LOCKMANAGER_RELEASE == status)
2885     lsocket->listening = GNUNET_NO;
2886 }
2887
2888
2889 /*****************/
2890 /* API functions */
2891 /*****************/
2892
2893
2894 /**
2895  * Tries to open a stream to the target peer
2896  *
2897  * @param cfg configuration to use
2898  * @param target the target peer to which the stream has to be opened
2899  * @param app_port the application port number which uniquely identifies this
2900  *            stream
2901  * @param open_cb this function will be called after stream has be established;
2902  *          cannot be NULL
2903  * @param open_cb_cls the closure for open_cb
2904  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2905  * @return if successful it returns the stream socket; NULL if stream cannot be
2906  *         opened 
2907  */
2908 struct GNUNET_STREAM_Socket *
2909 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2910                     const struct GNUNET_PeerIdentity *target,
2911                     GNUNET_MESH_ApplicationType app_port,
2912                     GNUNET_STREAM_OpenCallback open_cb,
2913                     void *open_cb_cls,
2914                     ...)
2915 {
2916   struct GNUNET_STREAM_Socket *socket;
2917   enum GNUNET_STREAM_Option option;
2918   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2919   va_list vargs;
2920   uint16_t payload_size;
2921
2922   LOG (GNUNET_ERROR_TYPE_DEBUG,
2923        "%s\n", __func__);
2924   GNUNET_assert (NULL != open_cb);
2925   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2926   socket->other_peer = *target;
2927   socket->open_cb = open_cb;
2928   socket->open_cls = open_cb_cls;
2929   /* Set defaults */
2930   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2931   socket->testing_active = GNUNET_NO;
2932   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2933   va_start (vargs, open_cb_cls); /* Parse variable args */
2934   do {
2935     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2936     switch (option)
2937     {
2938     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2939       /* Expect struct GNUNET_TIME_Relative */
2940       socket->retransmit_timeout = va_arg (vargs,
2941                                            struct GNUNET_TIME_Relative);
2942       break;
2943     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2944       socket->testing_active = GNUNET_YES;
2945       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2946                                                                 uint32_t);
2947       break;
2948     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2949       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2950       break;
2951     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2952       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2953       break;
2954     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2955       payload_size = (uint16_t) va_arg (vargs, unsigned int);
2956       GNUNET_assert (0 != payload_size);
2957       if (payload_size < socket->max_payload_size)
2958         socket->max_payload_size = payload_size;
2959       break;
2960     case GNUNET_STREAM_OPTION_END:
2961       break;
2962     }
2963   } while (GNUNET_STREAM_OPTION_END != option);
2964   va_end (vargs);               /* End of variable args parsing */
2965   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2966                                       socket, /* cls */
2967                                       NULL, /* No inbound tunnel handler */
2968                                       NULL, /* No in-tunnel cleaner */
2969                                       client_message_handlers,
2970                                       ports); /* We don't get inbound tunnels */
2971   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2972   {
2973     GNUNET_free (socket);
2974     return NULL;
2975   }
2976   /* Now create the mesh tunnel to target */
2977   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2978   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2979                                               NULL, /* Tunnel context */
2980                                               &mesh_peer_connect_callback,
2981                                               &mesh_peer_disconnect_callback,
2982                                               socket);
2983   GNUNET_assert (NULL != socket->tunnel);
2984   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2985                                         &socket->other_peer);
2986   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2987   return socket;
2988 }
2989
2990
2991 /**
2992  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2993  *
2994  * @param socket the stream socket
2995  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2996  * @param completion_cb the callback that will be called upon successful
2997  *          shutdown of given operation
2998  * @param completion_cls the closure for the completion callback
2999  * @return the shutdown handle
3000  */
3001 struct GNUNET_STREAM_ShutdownHandle *
3002 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3003                         int operation,
3004                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3005                         void *completion_cls)
3006 {
3007   struct GNUNET_STREAM_ShutdownHandle *handle;
3008   struct GNUNET_STREAM_MessageHeader *msg;
3009   
3010   GNUNET_assert (NULL == socket->shutdown_handle);
3011
3012   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3013   handle->socket = socket;
3014   handle->completion_cb = completion_cb;
3015   handle->completion_cls = completion_cls;
3016   socket->shutdown_handle = handle;
3017
3018   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3019   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3020   switch (operation)
3021   {
3022   case SHUT_RD:
3023     handle->operation = SHUT_RD;
3024     if (NULL != socket->read_handle)
3025       LOG (GNUNET_ERROR_TYPE_WARNING,
3026            "Existing read handle should be cancelled before shutting"
3027            " down reading\n");
3028     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3029     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3030                    GNUNET_NO);
3031     break;
3032   case SHUT_WR:
3033     handle->operation = SHUT_WR;
3034     if (NULL != socket->write_handle)
3035       LOG (GNUNET_ERROR_TYPE_WARNING,
3036            "Existing write handle should be cancelled before shutting"
3037            " down writing\n");
3038     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3039     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3040                    GNUNET_NO);
3041     break;
3042   case SHUT_RDWR:
3043     handle->operation = SHUT_RDWR;
3044     if (NULL != socket->write_handle)
3045       LOG (GNUNET_ERROR_TYPE_WARNING,
3046            "Existing write handle should be cancelled before shutting"
3047            " down writing\n");
3048     if (NULL != socket->read_handle)
3049       LOG (GNUNET_ERROR_TYPE_WARNING,
3050            "Existing read handle should be cancelled before shutting"
3051            " down reading\n");
3052     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3053     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3054     break;
3055   default:
3056     LOG (GNUNET_ERROR_TYPE_WARNING,
3057          "GNUNET_STREAM_shutdown called with invalid value for "
3058          "parameter operation -- Ignoring\n");
3059     GNUNET_free (msg);
3060     GNUNET_free (handle);
3061     return NULL;
3062   }
3063   handle->close_msg_retransmission_task_id =
3064     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3065                                   &close_msg_retransmission_task,
3066                                   handle);
3067   return handle;
3068 }
3069
3070
3071 /**
3072  * Cancels a pending shutdown
3073  *
3074  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3075  */
3076 void
3077 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3078 {
3079   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3080     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3081   handle->socket->shutdown_handle = NULL;
3082   GNUNET_free (handle);
3083 }
3084
3085
3086 /**
3087  * Closes the stream
3088  *
3089  * @param socket the stream socket
3090  */
3091 void
3092 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3093 {
3094   struct MessageQueue *head;
3095
3096   if (NULL != socket->read_handle)
3097   {
3098     LOG (GNUNET_ERROR_TYPE_WARNING,
3099          "Closing STREAM socket when a read handle is pending\n");
3100     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3101   }
3102   if (NULL != socket->write_handle)
3103   {
3104     LOG (GNUNET_ERROR_TYPE_WARNING,
3105          "Closing STREAM socket when a write handle is pending\n");
3106     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3107     //socket->write_handle = NULL;
3108   }
3109   /* Terminate the ack'ing task if they are still present */
3110   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3111   {
3112     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3113     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3114   }
3115   /* Terminate the control retransmission tasks */
3116   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3117   {
3118     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3119   }
3120   /* Clear Transmit handles */
3121   if (NULL != socket->transmit_handle)
3122   {
3123     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3124     socket->transmit_handle = NULL;
3125   }
3126   /* Clear existing message queue */
3127   while (NULL != (head = socket->queue_head)) {
3128     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3129                                  socket->queue_tail,
3130                                  head);
3131     GNUNET_free (head->message);
3132     GNUNET_free (head);
3133   }
3134   /* Close associated tunnel */
3135   if (NULL != socket->tunnel)
3136   {
3137     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3138     socket->tunnel = NULL;
3139   }
3140   /* Close mesh connection */
3141   if (NULL != socket->mesh && NULL == socket->lsocket)
3142   {
3143     GNUNET_MESH_disconnect (socket->mesh);
3144     socket->mesh = NULL;
3145   }  
3146   /* Release receive buffer */
3147   if (NULL != socket->receive_buffer)
3148   {
3149     GNUNET_free (socket->receive_buffer);
3150   }
3151   GNUNET_free (socket);
3152 }
3153
3154
3155 /**
3156  * Listens for stream connections for a specific application ports
3157  *
3158  * @param cfg the configuration to use
3159  * @param app_port the application port for which new streams will be accepted
3160  * @param listen_cb this function will be called when a peer tries to establish
3161  *            a stream with us
3162  * @param listen_cb_cls closure for listen_cb
3163  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3164  * @return listen socket, NULL for any error
3165  */
3166 struct GNUNET_STREAM_ListenSocket *
3167 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3168                       GNUNET_MESH_ApplicationType app_port,
3169                       GNUNET_STREAM_ListenCallback listen_cb,
3170                       void *listen_cb_cls,
3171                       ...)
3172 {
3173   /* FIXME: Add variable args for passing configration options? */
3174   struct GNUNET_STREAM_ListenSocket *lsocket;
3175   struct GNUNET_TIME_Relative listen_timeout;
3176   enum GNUNET_STREAM_Option option;
3177   va_list vargs;
3178   uint16_t payload_size;
3179
3180   GNUNET_assert (NULL != listen_cb);
3181   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3182   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3183   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3184   if (NULL == lsocket->lockmanager)
3185   {
3186     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3187     GNUNET_free (lsocket);
3188     return NULL;
3189   }
3190   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3191   /* Set defaults */
3192   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3193   lsocket->testing_active = GNUNET_NO;
3194   lsocket->listen_ok_cb = NULL;
3195   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3196   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3197   va_start (vargs, listen_cb_cls);
3198   do {
3199     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3200     switch (option)
3201     {
3202     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3203       lsocket->retransmit_timeout = va_arg (vargs,
3204                                             struct GNUNET_TIME_Relative);
3205       break;
3206     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3207       lsocket->testing_active = GNUNET_YES;
3208       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3209                                                                  uint32_t);
3210       break;
3211     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3212       listen_timeout = GNUNET_TIME_relative_multiply
3213         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3214       break;
3215     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3216       lsocket->listen_ok_cb = va_arg (vargs,
3217                                       GNUNET_STREAM_ListenSuccessCallback);
3218       break;
3219     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3220       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3221       GNUNET_assert (0 != payload_size);
3222       if (payload_size < lsocket->max_payload_size)
3223         lsocket->max_payload_size = payload_size;
3224       break;
3225     case GNUNET_STREAM_OPTION_END:
3226       break;
3227     }
3228   } while (GNUNET_STREAM_OPTION_END != option);
3229   va_end (vargs);
3230   lsocket->port = app_port;
3231   lsocket->listen_cb = listen_cb;
3232   lsocket->listen_cb_cls = listen_cb_cls;
3233   lsocket->locking_request = 
3234     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3235                                      (uint32_t) lsocket->port,
3236                                      &lock_status_change_cb, lsocket);
3237   lsocket->lockmanager_acquire_timeout_task =
3238     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3239                                   &lockmanager_acquire_timeout, lsocket);
3240   return lsocket;
3241 }
3242
3243
3244 /**
3245  * Closes the listen socket
3246  *
3247  * @param lsocket the listen socket
3248  */
3249 void
3250 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3251 {
3252   /* Close MESH connection */
3253   if (NULL != lsocket->mesh)
3254     GNUNET_MESH_disconnect (lsocket->mesh);
3255   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3256   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3257     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3258   if (NULL != lsocket->locking_request)
3259     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3260   if (NULL != lsocket->lockmanager)
3261     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3262   GNUNET_free (lsocket);
3263 }
3264
3265
3266 /**
3267  * Tries to write the given data to the stream. The maximum size of data that
3268  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3269  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3270  * violation, however only the said number of maximum bytes will be written.
3271  *
3272  * @param socket the socket representing a stream
3273  * @param data the data buffer from where the data is written into the stream
3274  * @param size the number of bytes to be written from the data buffer
3275  * @param timeout the timeout period
3276  * @param write_cont the function to call upon writing some bytes into the
3277  *          stream 
3278  * @param write_cont_cls the closure
3279  *
3280  * @return handle to cancel the operation; if a previous write is pending or
3281  *           the stream has been shutdown for this operation then write_cont is
3282  *           immediately called and NULL is returned.
3283  */
3284 struct GNUNET_STREAM_IOWriteHandle *
3285 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3286                      const void *data,
3287                      size_t size,
3288                      struct GNUNET_TIME_Relative timeout,
3289                      GNUNET_STREAM_CompletionContinuation write_cont,
3290                      void *write_cont_cls)
3291 {
3292   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3293   struct GNUNET_STREAM_DataMessage *data_msg;
3294   const void *sweep;
3295   struct GNUNET_TIME_Relative ack_deadline;
3296   unsigned int num_needed_packets;
3297   unsigned int packet;
3298   uint32_t packet_size;
3299   uint32_t payload_size;
3300   uint16_t max_data_packet_size;
3301
3302   LOG (GNUNET_ERROR_TYPE_DEBUG,
3303        "%s\n", __func__);
3304   if (NULL != socket->write_handle)
3305   {
3306     GNUNET_break (0);
3307     return NULL;
3308   }
3309   switch (socket->state)
3310   {
3311   case STATE_TRANSMIT_CLOSED:
3312   case STATE_TRANSMIT_CLOSE_WAIT:
3313   case STATE_CLOSED:
3314   case STATE_CLOSE_WAIT:
3315     if (NULL != write_cont)
3316       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3317     LOG (GNUNET_ERROR_TYPE_DEBUG,
3318          "%s() END\n", __func__);
3319     return NULL;
3320   case STATE_INIT:
3321   case STATE_LISTEN:
3322   case STATE_HELLO_WAIT:
3323     if (NULL != write_cont)
3324       /* FIXME: GNUNET_STREAM_SYSERR?? */
3325       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3326     LOG (GNUNET_ERROR_TYPE_DEBUG,
3327          "%s() END\n", __func__);
3328     return NULL;
3329   case STATE_ESTABLISHED:
3330   case STATE_RECEIVE_CLOSED:
3331   case STATE_RECEIVE_CLOSE_WAIT:
3332     break;
3333   }
3334   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3335     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3336   num_needed_packets =
3337       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3338   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3339   io_handle->socket = socket;
3340   io_handle->write_cont = write_cont;
3341   io_handle->write_cont_cls = write_cont_cls;
3342   io_handle->size = size;
3343   io_handle->packets_sent = 0;
3344   sweep = data;
3345   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3346      determined from RTT */
3347   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3348   /* Divide the given buffer into packets for sending */
3349   max_data_packet_size =
3350       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3351   for (packet=0; packet < num_needed_packets; packet++)
3352   {
3353     if ((packet + 1) * socket->max_payload_size < size) 
3354     {
3355       payload_size = socket->max_payload_size;
3356       packet_size = max_data_packet_size;
3357     }
3358     else 
3359     {
3360       payload_size = size - packet * socket->max_payload_size;
3361       packet_size = 
3362           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3363     }
3364     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3365     io_handle->messages[packet]->header.header.size = htons (packet_size);
3366     io_handle->messages[packet]->header.header.type =
3367       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3368     io_handle->messages[packet]->sequence_number =
3369       htonl (socket->write_sequence_number++);
3370     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3371     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3372        determined from RTT */
3373     io_handle->messages[packet]->ack_deadline =
3374       GNUNET_TIME_relative_hton (ack_deadline);
3375     data_msg = io_handle->messages[packet];
3376     /* Copy data from given buffer to the packet */
3377     memcpy (&data_msg[1], sweep, payload_size);
3378     sweep += payload_size;
3379     socket->write_offset += payload_size;
3380   }
3381   /* ack the last data message. FIXME: remove when we figure out how to do this
3382      using RTT */
3383   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3384       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3385   socket->write_handle = io_handle;
3386   write_data (socket);
3387   LOG (GNUNET_ERROR_TYPE_DEBUG,
3388        "%s() END\n", __func__);
3389   return io_handle;
3390 }
3391
3392
3393 /**
3394  * Tries to read data from the stream.
3395  *
3396  * @param socket the socket representing a stream
3397  * @param timeout the timeout period
3398  * @param proc function to call with data (once only)
3399  * @param proc_cls the closure for proc
3400  *
3401  * @return handle to cancel the operation; NULL is returned if: the stream has
3402  *           been shutdown for this type of opeartion (the DataProcessor is
3403  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3404  *           read handle is present (only one read handle per socket is present
3405  *           at any time)
3406  */
3407 struct GNUNET_STREAM_IOReadHandle *
3408 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3409                     struct GNUNET_TIME_Relative timeout,
3410                     GNUNET_STREAM_DataProcessor proc,
3411                     void *proc_cls)
3412 {
3413   struct GNUNET_STREAM_IOReadHandle *read_handle;
3414   
3415   LOG (GNUNET_ERROR_TYPE_DEBUG,
3416        "%s: %s()\n", 
3417        GNUNET_i2s (&socket->other_peer),
3418        __func__);
3419   /* Return NULL if there is already a read handle; the user has to cancel that
3420      first before continuing or has to wait until it is completed */
3421   if (NULL != socket->read_handle) 
3422     return NULL;
3423   GNUNET_assert (NULL != proc);
3424   switch (socket->state)
3425   {
3426   case STATE_RECEIVE_CLOSED:
3427   case STATE_RECEIVE_CLOSE_WAIT:
3428   case STATE_CLOSED:
3429   case STATE_CLOSE_WAIT:
3430     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3431     LOG (GNUNET_ERROR_TYPE_DEBUG,
3432          "%s: %s() END\n",
3433          GNUNET_i2s (&socket->other_peer),
3434          __func__);
3435     return NULL;
3436   default:
3437     break;
3438   }
3439   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3440   read_handle->proc = proc;
3441   read_handle->proc_cls = proc_cls;
3442   read_handle->socket = socket;
3443   socket->read_handle = read_handle;
3444   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3445                                           0))
3446     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3447                                                           socket);   
3448   read_handle->read_io_timeout_task_id =
3449       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3450   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3451        GNUNET_i2s (&socket->other_peer), __func__);
3452   return read_handle;
3453 }
3454
3455
3456 /**
3457  * Cancel pending write operation.
3458  *
3459  * @param ioh handle to operation to cancel
3460  */
3461 void
3462 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3463 {
3464   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3465   unsigned int packet;
3466
3467   GNUNET_assert (NULL != socket->write_handle);
3468   GNUNET_assert (socket->write_handle == ioh);
3469
3470   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3471   {
3472     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3473     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3474   }
3475
3476   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3477   {
3478     if (NULL == ioh->messages[packet]) break;
3479     GNUNET_free (ioh->messages[packet]);
3480   }
3481       
3482   GNUNET_free (socket->write_handle);
3483   socket->write_handle = NULL;
3484 }
3485
3486
3487 /**
3488  * Cancel pending read operation.
3489  *
3490  * @param ioh handle to operation to cancel
3491  */
3492 void
3493 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3494 {
3495   struct GNUNET_STREAM_Socket *socket;
3496   
3497   socket = ioh->socket;
3498   GNUNET_assert (NULL != socket->read_handle);
3499   GNUNET_assert (ioh == socket->read_handle);
3500   /* Read io time task should be there; if it is already executed then this
3501   read handle is not valid; However upon scheduler shutdown the read io task
3502   may be executed before */
3503   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3504     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3505   /* reading task may be present; if so we have to stop it */
3506   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3507     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3508   GNUNET_free (ioh);
3509   socket->read_handle = NULL;
3510 }
3511
3512 /* end of stream_api.c */