f042ccc938b0293a5454d55deb0073edddb2d49c
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 #define LOG(kind,...)                                   \
46   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
47
48 #define TIME_REL_SECS(sec) \
49   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
50
51 /**
52  * The maximum packet size of a stream packet
53  */
54 #define MAX_PACKET_SIZE 512//64000
55
56 /**
57  * Receive buffer
58  */
59 #define RECEIVE_BUFFER_SIZE 4096000
60
61 /**
62  * The maximum payload a data message packet can carry
63  */
64 static const size_t max_payload_size = 
65   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
66
67 /**
68  * states in the Protocol
69  */
70 enum State
71   {
72     /**
73      * Client initialization state
74      */
75     STATE_INIT,
76
77     /**
78      * Listener initialization state 
79      */
80     STATE_LISTEN,
81
82     /**
83      * Pre-connection establishment state
84      */
85     STATE_HELLO_WAIT,
86
87     /**
88      * State where a connection has been established
89      */
90     STATE_ESTABLISHED,
91
92     /**
93      * State where the socket is closed on our side and waiting to be ACK'ed
94      */
95     STATE_RECEIVE_CLOSE_WAIT,
96
97     /**
98      * State where the socket is closed for reading
99      */
100     STATE_RECEIVE_CLOSED,
101
102     /**
103      * State where the socket is closed on our side and waiting to be ACK'ed
104      */
105     STATE_TRANSMIT_CLOSE_WAIT,
106
107     /**
108      * State where the socket is closed for writing
109      */
110     STATE_TRANSMIT_CLOSED,
111
112     /**
113      * State where the socket is closed on our side and waiting to be ACK'ed
114      */
115     STATE_CLOSE_WAIT,
116
117     /**
118      * State where the socket is closed
119      */
120     STATE_CLOSED 
121   };
122
123
124 /**
125  * Functions of this type are called when a message is written
126  *
127  * @param cls the closure from queue_message
128  * @param socket the socket the written message was bound to
129  */
130 typedef void (*SendFinishCallback) (void *cls,
131                                     struct GNUNET_STREAM_Socket *socket);
132
133
134 /**
135  * The send message queue
136  */
137 struct MessageQueue
138 {
139   /**
140    * The message
141    */
142   struct GNUNET_STREAM_MessageHeader *message;
143
144   /**
145    * Callback to be called when the message is sent
146    */
147   SendFinishCallback finish_cb;
148
149   /**
150    * The closure for finish_cb
151    */
152   void *finish_cb_cls;
153
154   /**
155    * The next message in queue. Should be NULL in the last message
156    */
157   struct MessageQueue *next;
158
159   /**
160    * The next message in queue. Should be NULL in the first message
161    */
162   struct MessageQueue *prev;
163 };
164
165
166 /**
167  * The STREAM Socket Handler
168  */
169 struct GNUNET_STREAM_Socket
170 {
171   /**
172    * Retransmission timeout
173    */
174   struct GNUNET_TIME_Relative retransmit_timeout;
175
176   /**
177    * The Acknowledgement Bitmap
178    */
179   GNUNET_STREAM_AckBitmap ack_bitmap;
180
181   /**
182    * Time when the Acknowledgement was queued
183    */
184   struct GNUNET_TIME_Absolute ack_time_registered;
185
186   /**
187    * Queued Acknowledgement deadline
188    */
189   struct GNUNET_TIME_Relative ack_time_deadline;
190
191   /**
192    * The mesh handle
193    */
194   struct GNUNET_MESH_Handle *mesh;
195
196   /**
197    * The mesh tunnel handle
198    */
199   struct GNUNET_MESH_Tunnel *tunnel;
200
201   /**
202    * Stream open closure
203    */
204   void *open_cls;
205
206   /**
207    * Stream open callback
208    */
209   GNUNET_STREAM_OpenCallback open_cb;
210
211   /**
212    * The current transmit handle (if a pending transmit request exists)
213    */
214   struct GNUNET_MESH_TransmitHandle *transmit_handle;
215
216   /**
217    * The current act transmit handle (if a pending ack transmit request exists)
218    */
219   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
220
221   /**
222    * Pointer to the current ack message using in ack_task
223    */
224   struct GNUNET_STREAM_AckMessage *ack_msg;
225
226   /**
227    * The current message associated with the transmit handle
228    */
229   struct MessageQueue *queue_head;
230
231   /**
232    * The queue tail, should always point to the last message in queue
233    */
234   struct MessageQueue *queue_tail;
235
236   /**
237    * The write IO_handle associated with this socket
238    */
239   struct GNUNET_STREAM_IOWriteHandle *write_handle;
240
241   /**
242    * The read IO_handle associated with this socket
243    */
244   struct GNUNET_STREAM_IOReadHandle *read_handle;
245
246   /**
247    * The shutdown handle associated with this socket
248    */
249   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
250
251   /**
252    * Buffer for storing received messages
253    */
254   void *receive_buffer;
255
256   /**
257    * The listen socket from which this socket is derived. Should be NULL if it
258    * is not a derived socket
259    */
260   struct GNUNET_STREAM_ListenSocket *lsocket;
261
262   /**
263    * The peer identity of the peer at the other end of the stream
264    */
265   struct GNUNET_PeerIdentity other_peer;
266
267   /**
268    * Task identifier for the read io timeout task
269    */
270   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
271
272   /**
273    * Task identifier for retransmission task after timeout
274    */
275   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
276
277   /**
278    * The task for sending timely Acks
279    */
280   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
281
282   /**
283    * Task scheduled to continue a read operation.
284    */
285   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
286
287   /**
288    * The state of the protocol associated with this socket
289    */
290   enum State state;
291
292   /**
293    * The status of the socket
294    */
295   enum GNUNET_STREAM_Status status;
296
297   /**
298    * The number of previous timeouts; FIXME: currently not used
299    */
300   unsigned int retries;
301
302   /**
303    * The application port number (type: uint32_t)
304    */
305   GNUNET_MESH_ApplicationType app_port;
306
307   /**
308    * Whether testing mode is active or not
309    */
310   int testing_active;
311
312   /**
313    * The write sequence number to be set incase of testing
314    */
315   uint32_t testing_set_write_sequence_number_value;
316
317   /**
318    * The session id associated with this stream connection
319    * FIXME: Not used currently, may be removed
320    */
321   uint32_t session_id;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363 };
364
365
366 /**
367  * A socket for listening
368  */
369 struct GNUNET_STREAM_ListenSocket
370 {
371   /**
372    * The mesh handle
373    */
374   struct GNUNET_MESH_Handle *mesh;
375
376   /**
377    * Our configuration
378    */
379   struct GNUNET_CONFIGURATION_Handle *cfg;
380
381   /**
382    * Handle to the lock manager service
383    */
384   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
385
386   /**
387    * The active LockingRequest from lockmanager
388    */
389   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
390
391   /**
392    * Callback to call after acquring a lock and listening
393    */
394   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
395
396   /**
397    * The callback function which is called after successful opening socket
398    */
399   GNUNET_STREAM_ListenCallback listen_cb;
400
401   /**
402    * The call back closure
403    */
404   void *listen_cb_cls;
405
406   /**
407    * The service port
408    */
409   GNUNET_MESH_ApplicationType port;
410   
411   /**
412    * The id of the lockmanager timeout task
413    */
414   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
415
416   /**
417    * The retransmit timeout
418    */
419   struct GNUNET_TIME_Relative retransmit_timeout;
420   
421   /**
422    * Listen enabled?
423    */
424   int listening;
425
426   /**
427    * Whether testing mode is active or not
428    */
429   int testing_active;
430
431   /**
432    * The write sequence number to be set incase of testing
433    */
434   uint32_t testing_set_write_sequence_number_value;
435 };
436
437
438 /**
439  * The IO Write Handle
440  */
441 struct GNUNET_STREAM_IOWriteHandle
442 {
443   /**
444    * The socket to which this write handle is associated
445    */
446   struct GNUNET_STREAM_Socket *socket;
447
448   /**
449    * The packet_buffers associated with this Handle
450    */
451   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
452
453   /**
454    * The write continuation callback
455    */
456   GNUNET_STREAM_CompletionContinuation write_cont;
457
458   /**
459    * Write continuation closure
460    */
461   void *write_cont_cls;
462
463   /**
464    * The bitmap of this IOHandle; Corresponding bit for a message is set when
465    * it has been acknowledged by the receiver
466    */
467   GNUNET_STREAM_AckBitmap ack_bitmap;
468
469   /**
470    * Number of bytes in this write handle
471    */
472   size_t size;
473 };
474
475
476 /**
477  * The IO Read Handle
478  */
479 struct GNUNET_STREAM_IOReadHandle
480 {
481   /**
482    * Callback for the read processor
483    */
484   GNUNET_STREAM_DataProcessor proc;
485
486   /**
487    * The closure pointer for the read processor callback
488    */
489   void *proc_cls;
490 };
491
492
493 /**
494  * Handle for Shutdown
495  */
496 struct GNUNET_STREAM_ShutdownHandle
497 {
498   /**
499    * The socket associated with this shutdown handle
500    */
501   struct GNUNET_STREAM_Socket *socket;
502
503   /**
504    * Shutdown completion callback
505    */
506   GNUNET_STREAM_ShutdownCompletion completion_cb;
507
508   /**
509    * Closure for completion callback
510    */
511   void *completion_cls;
512
513   /**
514    * Close message retransmission task id
515    */
516   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
517
518   /**
519    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
520    */
521   int operation;  
522 };
523
524
525 /**
526  * Default value in seconds for various timeouts
527  */
528 static const unsigned int default_timeout = 10;
529
530 /**
531  * The domain name for locks we use here
532  */
533 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
534
535
536 /**
537  * Callback function for sending queued message
538  *
539  * @param cls closure the socket
540  * @param size number of bytes available in buf
541  * @param buf where the callee should write the message
542  * @return number of bytes written to buf
543  */
544 static size_t
545 send_message_notify (void *cls, size_t size, void *buf)
546 {
547   struct GNUNET_STREAM_Socket *socket = cls;
548   struct MessageQueue *head;
549   size_t ret;
550
551   socket->transmit_handle = NULL; /* Remove the transmit handle */
552   head = socket->queue_head;
553   if (NULL == head)
554     return 0; /* just to be safe */
555   if (0 == size)                /* request timed out */
556   {
557     socket->retries++;
558     LOG (GNUNET_ERROR_TYPE_DEBUG,
559          "%s: Message sending timed out. Retry %d \n",
560          GNUNET_i2s (&socket->other_peer),
561          socket->retries);
562     socket->transmit_handle = 
563       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
564                                          0, /* Corking */
565                                          1, /* Priority */
566                                          /* FIXME: exponential backoff */
567                                          socket->retransmit_timeout,
568                                          &socket->other_peer,
569                                          ntohs (head->message->header.size),
570                                          &send_message_notify,
571                                          socket);
572     return 0;
573   }
574
575   ret = ntohs (head->message->header.size);
576   GNUNET_assert (size >= ret);
577   memcpy (buf, head->message, ret);
578   if (NULL != head->finish_cb)
579   {
580     head->finish_cb (head->finish_cb_cls, socket);
581   }
582   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
583                                socket->queue_tail,
584                                head);
585   GNUNET_free (head->message);
586   GNUNET_free (head);
587   head = socket->queue_head;
588   if (NULL != head)    /* more pending messages to send */
589   {
590     socket->retries = 0;
591     socket->transmit_handle = 
592       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
593                                          0, /* Corking */
594                                          1, /* Priority */
595                                          /* FIXME: exponential backoff */
596                                          socket->retransmit_timeout,
597                                          &socket->other_peer,
598                                          ntohs (head->message->header.size),
599                                          &send_message_notify,
600                                          socket);
601   }
602   return ret;
603 }
604
605
606 /**
607  * Queues a message for sending using the mesh connection of a socket
608  *
609  * @param socket the socket whose mesh connection is used
610  * @param message the message to be sent
611  * @param finish_cb the callback to be called when the message is sent
612  * @param finish_cb_cls the closure for the callback
613  */
614 static void
615 queue_message (struct GNUNET_STREAM_Socket *socket,
616                struct GNUNET_STREAM_MessageHeader *message,
617                SendFinishCallback finish_cb,
618                void *finish_cb_cls)
619 {
620   struct MessageQueue *queue_entity;
621
622   GNUNET_assert 
623     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
624      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
625
626   LOG (GNUNET_ERROR_TYPE_DEBUG,
627        "%s: Queueing message of type %d and size %d\n",
628        GNUNET_i2s (&socket->other_peer),
629        ntohs (message->header.type),
630        ntohs (message->header.size));
631   GNUNET_assert (NULL != message);
632   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
633   queue_entity->message = message;
634   queue_entity->finish_cb = finish_cb;
635   queue_entity->finish_cb_cls = finish_cb_cls;
636   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
637                                     socket->queue_tail,
638                                     queue_entity);
639   if (NULL == socket->transmit_handle)
640   {
641     socket->retries = 0;
642     socket->transmit_handle = 
643       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
644                                          0, /* Corking */
645                                          1, /* Priority */
646                                          socket->retransmit_timeout,
647                                          &socket->other_peer,
648                                          ntohs (message->header.size),
649                                          &send_message_notify,
650                                          socket);
651   }
652 }
653
654
655 /**
656  * Copies a message and queues it for sending using the mesh connection of
657  * given socket 
658  *
659  * @param socket the socket whose mesh connection is used
660  * @param message the message to be sent
661  * @param finish_cb the callback to be called when the message is sent
662  * @param finish_cb_cls the closure for the callback
663  */
664 static void
665 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
666                         const struct GNUNET_STREAM_MessageHeader *message,
667                         SendFinishCallback finish_cb,
668                         void *finish_cb_cls)
669 {
670   struct GNUNET_STREAM_MessageHeader *msg_copy;
671   uint16_t size;
672   
673   size = ntohs (message->header.size);
674   msg_copy = GNUNET_malloc (size);
675   memcpy (msg_copy, message, size);
676   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
677 }
678
679
680 /**
681  * Callback function for sending ack message
682  *
683  * @param cls closure the ACK message created in ack_task
684  * @param size number of bytes available in buffer
685  * @param buf where the callee should write the message
686  * @return number of bytes written to buf
687  */
688 static size_t
689 send_ack_notify (void *cls, size_t size, void *buf)
690 {
691   struct GNUNET_STREAM_Socket *socket = cls;
692
693   if (0 == size)
694   {
695     LOG (GNUNET_ERROR_TYPE_DEBUG,
696          "%s called with size 0\n", __func__);
697     return 0;
698   }
699   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
700   
701   size = ntohs (socket->ack_msg->header.header.size);
702   memcpy (buf, socket->ack_msg, size);
703   
704   GNUNET_free (socket->ack_msg);
705   socket->ack_msg = NULL;
706   socket->ack_transmit_handle = NULL;
707   return size;
708 }
709
710
711 /**
712  * Writes data using the given socket. The amount of data written is limited by
713  * the receiver_window_size
714  *
715  * @param socket the socket to use
716  */
717 static void 
718 write_data (struct GNUNET_STREAM_Socket *socket);
719
720
721 /**
722  * Task for retransmitting data messages if they aren't ACK before their ack
723  * deadline 
724  *
725  * @param cls the socket
726  * @param tc the Task context
727  */
728 static void
729 retransmission_timeout_task (void *cls,
730                              const struct GNUNET_SCHEDULER_TaskContext *tc)
731 {
732   struct GNUNET_STREAM_Socket *socket = cls;
733   
734   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
735     return;
736
737   LOG (GNUNET_ERROR_TYPE_DEBUG,
738        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
739   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
740   write_data (socket);
741 }
742
743
744 /**
745  * Task for sending ACK message
746  *
747  * @param cls the socket
748  * @param tc the Task context
749  */
750 static void
751 ack_task (void *cls,
752           const struct GNUNET_SCHEDULER_TaskContext *tc)
753 {
754   struct GNUNET_STREAM_Socket *socket = cls;
755   struct GNUNET_STREAM_AckMessage *ack_msg;
756
757   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
758   {
759     return;
760   }
761   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
762   /* Create the ACK Message */
763   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
764   ack_msg->header.header.size = htons (sizeof (struct 
765                                                GNUNET_STREAM_AckMessage));
766   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
767   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
768   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
769   ack_msg->receive_window_remaining = 
770     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
771   socket->ack_msg = ack_msg;
772   /* Request MESH for sending ACK */
773   socket->ack_transmit_handle = 
774     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
775                                        0, /* Corking */
776                                        1, /* Priority */
777                                        socket->retransmit_timeout,
778                                        &socket->other_peer,
779                                        ntohs (ack_msg->header.header.size),
780                                        &send_ack_notify,
781                                        socket);
782 }
783
784
785 /**
786  * Retransmission task for shutdown messages
787  *
788  * @param cls the shutdown handle
789  * @param tc the Task Context
790  */
791 static void
792 close_msg_retransmission_task (void *cls,
793                                const struct GNUNET_SCHEDULER_TaskContext *tc)
794 {
795   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
796   struct GNUNET_STREAM_MessageHeader *msg;
797   struct GNUNET_STREAM_Socket *socket;
798
799   GNUNET_assert (NULL != shutdown_handle);
800   socket = shutdown_handle->socket;
801
802   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
803   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
804   switch (shutdown_handle->operation)
805   {
806   case SHUT_RDWR:
807     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
808     break;
809   case SHUT_RD:
810     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
811     break;
812   case SHUT_WR:
813     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
814     break;
815   default:
816     GNUNET_free (msg);
817     shutdown_handle->close_msg_retransmission_task_id = 
818       GNUNET_SCHEDULER_NO_TASK;
819     return;
820   }
821   queue_message (socket, msg, NULL, NULL);
822   shutdown_handle->close_msg_retransmission_task_id =
823     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
824                                   &close_msg_retransmission_task,
825                                   shutdown_handle);
826 }
827
828
829 /**
830  * Function to modify a bit in GNUNET_STREAM_AckBitmap
831  *
832  * @param bitmap the bitmap to modify
833  * @param bit the bit number to modify
834  * @param value GNUNET_YES to on, GNUNET_NO to off
835  */
836 static void
837 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
838                       unsigned int bit, 
839                       int value)
840 {
841   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
842   if (GNUNET_YES == value)
843     *bitmap |= (1LL << bit);
844   else
845     *bitmap &= ~(1LL << bit);
846 }
847
848
849 /**
850  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
851  *
852  * @param bitmap address of the bitmap that has to be checked
853  * @param bit the bit number to check
854  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
855  */
856 static uint8_t
857 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
858                       unsigned int bit)
859 {
860   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
861   return 0 != (*bitmap & (1LL << bit));
862 }
863
864
865 /**
866  * Writes data using the given socket. The amount of data written is limited by
867  * the receiver_window_size
868  *
869  * @param socket the socket to use
870  */
871 static void 
872 write_data (struct GNUNET_STREAM_Socket *socket)
873 {
874   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
875   int packet;                   /* Although an int, should never be negative */
876   int ack_packet;
877
878   ack_packet = -1;
879   /* Find the last acknowledged packet */
880   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
881   {      
882     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
883                                             packet))
884       ack_packet = packet;        
885     else if (NULL == io_handle->messages[packet])
886       break;
887   }
888   /* Resend packets which weren't ack'ed */
889   for (packet=0; packet < ack_packet; packet++)
890   {
891     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
892                                            packet))
893     {
894       LOG (GNUNET_ERROR_TYPE_DEBUG,
895            "%s: Placing DATA message with sequence %u in send queue\n",
896            GNUNET_i2s (&socket->other_peer),
897            ntohl (io_handle->messages[packet]->sequence_number));
898       copy_and_queue_message (socket,
899                               &io_handle->messages[packet]->header,
900                               NULL,
901                               NULL);
902     }
903   }
904   packet = ack_packet + 1;
905   /* Now send new packets if there is enough buffer space */
906   while ( (NULL != io_handle->messages[packet]) &&
907           (socket->receiver_window_available 
908            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
909           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
910   {
911     socket->receiver_window_available -= 
912       ntohs (io_handle->messages[packet]->header.header.size);
913     LOG (GNUNET_ERROR_TYPE_DEBUG,
914          "%s: Placing DATA message with sequence %u in send queue\n",
915          GNUNET_i2s (&socket->other_peer),
916          ntohl (io_handle->messages[packet]->sequence_number));
917     copy_and_queue_message (socket,
918                             &io_handle->messages[packet]->header,
919                             NULL,
920                             NULL);
921     packet++;
922   }
923   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
924     socket->retransmission_timeout_task_id = 
925       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
926                                     (GNUNET_TIME_UNIT_SECONDS, 8),
927                                     &retransmission_timeout_task,
928                                     socket);
929 }
930
931
932 /**
933  * Task for calling the read processor
934  *
935  * @param cls the socket
936  * @param tc the task context
937  */
938 static void
939 call_read_processor (void *cls,
940                      const struct GNUNET_SCHEDULER_TaskContext *tc)
941 {
942   struct GNUNET_STREAM_Socket *socket = cls;
943   size_t read_size;
944   size_t valid_read_size;
945   unsigned int packet;
946   uint32_t sequence_increase;
947   uint32_t offset_increase;
948
949   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
950   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
951     return;
952
953   if (NULL == socket->receive_buffer) 
954     return;
955
956   GNUNET_assert (NULL != socket->read_handle);
957   GNUNET_assert (NULL != socket->read_handle->proc);
958
959   /* Check the bitmap for any holes */
960   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
961   {
962     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
963                                            packet))
964       break;
965   }
966   /* We only call read processor if we have the first packet */
967   GNUNET_assert (0 < packet);
968   valid_read_size = 
969     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
970   GNUNET_assert (0 != valid_read_size);
971   /* Cancel the read_io_timeout_task */
972   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
973   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
974   /* Call the data processor */
975   LOG (GNUNET_ERROR_TYPE_DEBUG,
976        "%s: Calling read processor\n",
977        GNUNET_i2s (&socket->other_peer));
978   read_size = 
979     socket->read_handle->proc (socket->read_handle->proc_cls,
980                                socket->status,
981                                socket->receive_buffer + socket->copy_offset,
982                                valid_read_size);
983   LOG (GNUNET_ERROR_TYPE_DEBUG,
984        "%s: Read processor read %d bytes\n",
985        GNUNET_i2s (&socket->other_peer), read_size);
986   LOG (GNUNET_ERROR_TYPE_DEBUG,
987        "%s: Read processor completed successfully\n",
988        GNUNET_i2s (&socket->other_peer));
989   /* Free the read handle */
990   GNUNET_free (socket->read_handle);
991   socket->read_handle = NULL;
992   GNUNET_assert (read_size <= valid_read_size);
993   socket->copy_offset += read_size;
994   /* Determine upto which packet we can remove from the buffer */
995   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
996   {
997     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
998     { packet++; break; }
999     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1000       break;
1001   }
1002
1003   /* If no packets can be removed we can't move the buffer */
1004   if (0 == packet) return;
1005   sequence_increase = packet;
1006   LOG (GNUNET_ERROR_TYPE_DEBUG,
1007        "%s: Sequence increase after read processor completion: %u\n",
1008        GNUNET_i2s (&socket->other_peer), sequence_increase);
1009
1010   /* Shift the data in the receive buffer */
1011   socket->receive_buffer = 
1012     memmove (socket->receive_buffer,
1013              socket->receive_buffer 
1014              + socket->receive_buffer_boundaries[sequence_increase-1],
1015              socket->receive_buffer_size
1016              - socket->receive_buffer_boundaries[sequence_increase-1]);
1017   /* Shift the bitmap */
1018   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1019   /* Set read_sequence_number */
1020   socket->read_sequence_number += sequence_increase;
1021   /* Set read_offset */
1022   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1023   socket->read_offset += offset_increase;
1024   /* Fix copy_offset */
1025   GNUNET_assert (offset_increase <= socket->copy_offset);
1026   socket->copy_offset -= offset_increase;
1027   /* Fix relative boundaries */
1028   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1029   {
1030     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1031     {
1032       uint32_t ahead_buffer_boundary;
1033
1034       ahead_buffer_boundary = 
1035         socket->receive_buffer_boundaries[packet + sequence_increase];
1036       if (0 == ahead_buffer_boundary)
1037         socket->receive_buffer_boundaries[packet] = 0;
1038       else
1039       {
1040         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1041         socket->receive_buffer_boundaries[packet] = 
1042           ahead_buffer_boundary - offset_increase;
1043       }
1044     }
1045     else
1046       socket->receive_buffer_boundaries[packet] = 0;
1047   }
1048 }
1049
1050
1051 /**
1052  * Cancels the existing read io handle
1053  *
1054  * @param cls the closure from the SCHEDULER call
1055  * @param tc the task context
1056  */
1057 static void
1058 read_io_timeout (void *cls, 
1059                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1060 {
1061   struct GNUNET_STREAM_Socket *socket = cls;
1062   GNUNET_STREAM_DataProcessor proc;
1063   void *proc_cls;
1064
1065   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1066   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1067   {
1068     LOG (GNUNET_ERROR_TYPE_DEBUG,
1069          "%s: Read task timedout - Cancelling it\n",
1070          GNUNET_i2s (&socket->other_peer));
1071     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1072     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1073   }
1074   GNUNET_assert (NULL != socket->read_handle);
1075   proc = socket->read_handle->proc;
1076   proc_cls = socket->read_handle->proc_cls;
1077
1078   GNUNET_free (socket->read_handle);
1079   socket->read_handle = NULL;
1080   /* Call the read processor to signal timeout */
1081   proc (proc_cls,
1082         GNUNET_STREAM_TIMEOUT,
1083         NULL,
1084         0);
1085 }
1086
1087
1088 /**
1089  * Handler for DATA messages; Same for both client and server
1090  *
1091  * @param socket the socket through which the ack was received
1092  * @param tunnel connection to the other end
1093  * @param sender who sent the message
1094  * @param msg the data message
1095  * @param atsi performance data for the connection
1096  * @return GNUNET_OK to keep the connection open,
1097  *         GNUNET_SYSERR to close it (signal serious error)
1098  */
1099 static int
1100 handle_data (struct GNUNET_STREAM_Socket *socket,
1101              struct GNUNET_MESH_Tunnel *tunnel,
1102              const struct GNUNET_PeerIdentity *sender,
1103              const struct GNUNET_STREAM_DataMessage *msg,
1104              const struct GNUNET_ATS_Information*atsi)
1105 {
1106   const void *payload;
1107   uint32_t bytes_needed;
1108   uint32_t relative_offset;
1109   uint32_t relative_sequence_number;
1110   uint16_t size;
1111
1112   size = htons (msg->header.header.size);
1113   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1114   {
1115     GNUNET_break_op (0);
1116     return GNUNET_SYSERR;
1117   }
1118
1119   if (0 != memcmp (sender,
1120                    &socket->other_peer,
1121                    sizeof (struct GNUNET_PeerIdentity)))
1122   {
1123     LOG (GNUNET_ERROR_TYPE_DEBUG,
1124          "%s: Received DATA from non-confirming peer\n",
1125          GNUNET_i2s (&socket->other_peer));
1126     return GNUNET_YES;
1127   }
1128
1129   switch (socket->state)
1130   {
1131   case STATE_ESTABLISHED:
1132   case STATE_TRANSMIT_CLOSED:
1133   case STATE_TRANSMIT_CLOSE_WAIT:
1134       
1135     /* check if the message's sequence number is in the range we are
1136        expecting */
1137     relative_sequence_number = 
1138       ntohl (msg->sequence_number) - socket->read_sequence_number;
1139     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1140     {
1141       LOG (GNUNET_ERROR_TYPE_DEBUG,
1142            "%s: Ignoring received message with sequence number %u\n",
1143            GNUNET_i2s (&socket->other_peer),
1144            ntohl (msg->sequence_number));
1145       /* Start ACK sending task if one is not already present */
1146       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1147       {
1148         socket->ack_task_id = 
1149           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1150                                         (msg->ack_deadline),
1151                                         &ack_task,
1152                                         socket);
1153       }
1154       return GNUNET_YES;
1155     }
1156       
1157     /* Check if we have already seen this message */
1158     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1159                                             relative_sequence_number))
1160     {
1161       LOG (GNUNET_ERROR_TYPE_DEBUG,
1162            "%s: Ignoring already received message with sequence number %u\n",
1163            GNUNET_i2s (&socket->other_peer),
1164            ntohl (msg->sequence_number));
1165       /* Start ACK sending task if one is not already present */
1166       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1167       {
1168         socket->ack_task_id = 
1169           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1170                                         (msg->ack_deadline),
1171                                         &ack_task,
1172                                         socket);
1173       }
1174       return GNUNET_YES;
1175     }
1176
1177     LOG (GNUNET_ERROR_TYPE_DEBUG,
1178          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1179          GNUNET_i2s (&socket->other_peer),
1180          ntohl (msg->sequence_number),
1181          ntohs (msg->header.header.size),
1182          GNUNET_i2s (&socket->other_peer));
1183       
1184     /* Check if we have to allocate the buffer */
1185     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1186     relative_offset = ntohl (msg->offset) - socket->read_offset;
1187     bytes_needed = relative_offset + size;
1188     if (bytes_needed > socket->receive_buffer_size)
1189     {
1190       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1191       {
1192         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1193                                                  bytes_needed);
1194         socket->receive_buffer_size = bytes_needed;
1195       }
1196       else
1197       {
1198         LOG (GNUNET_ERROR_TYPE_DEBUG,
1199              "%s: Cannot accommodate packet %d as buffer is full\n",
1200              GNUNET_i2s (&socket->other_peer),
1201              ntohl (msg->sequence_number));
1202         return GNUNET_YES;
1203       }
1204     }
1205       
1206     /* Copy Data to buffer */
1207     payload = &msg[1];
1208     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1209     memcpy (socket->receive_buffer + relative_offset,
1210             payload,
1211             size);
1212     socket->receive_buffer_boundaries[relative_sequence_number] = 
1213       relative_offset + size;
1214       
1215     /* Modify the ACK bitmap */
1216     ackbitmap_modify_bit (&socket->ack_bitmap,
1217                           relative_sequence_number,
1218                           GNUNET_YES);
1219
1220     /* Start ACK sending task if one is not already present */
1221     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1222     {
1223       socket->ack_task_id = 
1224         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1225                                       (msg->ack_deadline),
1226                                       &ack_task,
1227                                       socket);
1228     }
1229
1230     if ((NULL != socket->read_handle) /* A read handle is waiting */
1231         /* There is no current read task */
1232         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1233         /* We have the first packet */
1234         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1235                                                0)))
1236     {
1237       LOG (GNUNET_ERROR_TYPE_DEBUG,
1238            "%s: Scheduling read processor\n",
1239            GNUNET_i2s (&socket->other_peer));
1240           
1241       socket->read_task_id = 
1242         GNUNET_SCHEDULER_add_now (&call_read_processor,
1243                                   socket);
1244     }
1245       
1246     break;
1247
1248   default:
1249     LOG (GNUNET_ERROR_TYPE_DEBUG,
1250          "%s: Received data message when it cannot be handled\n",
1251          GNUNET_i2s (&socket->other_peer));
1252     break;
1253   }
1254   return GNUNET_YES;
1255 }
1256
1257
1258 /**
1259  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1260  *
1261  * @param cls the socket (set from GNUNET_MESH_connect)
1262  * @param tunnel connection to the other end
1263  * @param tunnel_ctx place to store local state associated with the tunnel
1264  * @param sender who sent the message
1265  * @param message the actual message
1266  * @param atsi performance data for the connection
1267  * @return GNUNET_OK to keep the connection open,
1268  *         GNUNET_SYSERR to close it (signal serious error)
1269  */
1270 static int
1271 client_handle_data (void *cls,
1272                     struct GNUNET_MESH_Tunnel *tunnel,
1273                     void **tunnel_ctx,
1274                     const struct GNUNET_PeerIdentity *sender,
1275                     const struct GNUNET_MessageHeader *message,
1276                     const struct GNUNET_ATS_Information*atsi)
1277 {
1278   struct GNUNET_STREAM_Socket *socket = cls;
1279
1280   return handle_data (socket, 
1281                       tunnel, 
1282                       sender, 
1283                       (const struct GNUNET_STREAM_DataMessage *) message, 
1284                       atsi);
1285 }
1286
1287
1288 /**
1289  * Callback to set state to ESTABLISHED
1290  *
1291  * @param cls the closure from queue_message FIXME: document
1292  * @param socket the socket to requiring state change
1293  */
1294 static void
1295 set_state_established (void *cls,
1296                        struct GNUNET_STREAM_Socket *socket)
1297 {
1298   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1299        "%s: Attaining ESTABLISHED state\n",
1300        GNUNET_i2s (&socket->other_peer));
1301   socket->write_offset = 0;
1302   socket->read_offset = 0;
1303   socket->state = STATE_ESTABLISHED;
1304   if (NULL != socket->lsocket)
1305   {
1306     LOG (GNUNET_ERROR_TYPE_DEBUG,
1307          "%s: Calling listen callback\n",
1308          GNUNET_i2s (&socket->other_peer));
1309     if (GNUNET_SYSERR == 
1310         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1311                                     socket,
1312                                     &socket->other_peer))
1313     {
1314       socket->state = STATE_CLOSED;
1315       /* FIXME: We should close in a decent way (send RST) */
1316       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1317       GNUNET_free (socket);
1318     }
1319   }
1320   else if (NULL != socket->open_cb)
1321     socket->open_cb (socket->open_cls, socket);
1322 }
1323
1324
1325 /**
1326  * Callback to set state to HELLO_WAIT
1327  *
1328  * @param cls the closure from queue_message
1329  * @param socket the socket to requiring state change
1330  */
1331 static void
1332 set_state_hello_wait (void *cls,
1333                       struct GNUNET_STREAM_Socket *socket)
1334 {
1335   GNUNET_assert (STATE_INIT == socket->state);
1336   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1337        "%s: Attaining HELLO_WAIT state\n",
1338        GNUNET_i2s (&socket->other_peer));
1339   socket->state = STATE_HELLO_WAIT;
1340 }
1341
1342
1343 /**
1344  * Callback to set state to CLOSE_WAIT
1345  *
1346  * @param cls the closure from queue_message
1347  * @param socket the socket requiring state change
1348  */
1349 static void
1350 set_state_close_wait (void *cls,
1351                       struct GNUNET_STREAM_Socket *socket)
1352 {
1353   LOG (GNUNET_ERROR_TYPE_DEBUG,
1354        "%s: Attaing CLOSE_WAIT state\n",
1355        GNUNET_i2s (&socket->other_peer));
1356   socket->state = STATE_CLOSE_WAIT;
1357   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1358   socket->receive_buffer = NULL;
1359   socket->receive_buffer_size = 0;
1360 }
1361
1362
1363 /**
1364  * Callback to set state to RECEIVE_CLOSE_WAIT
1365  *
1366  * @param cls the closure from queue_message
1367  * @param socket the socket requiring state change
1368  */
1369 static void
1370 set_state_receive_close_wait (void *cls,
1371                               struct GNUNET_STREAM_Socket *socket)
1372 {
1373   LOG (GNUNET_ERROR_TYPE_DEBUG,
1374        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1375        GNUNET_i2s (&socket->other_peer));
1376   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1377   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1378   socket->receive_buffer = NULL;
1379   socket->receive_buffer_size = 0;
1380 }
1381
1382
1383 /**
1384  * Callback to set state to TRANSMIT_CLOSE_WAIT
1385  *
1386  * @param cls the closure from queue_message
1387  * @param socket the socket requiring state change
1388  */
1389 static void
1390 set_state_transmit_close_wait (void *cls,
1391                                struct GNUNET_STREAM_Socket *socket)
1392 {
1393   LOG (GNUNET_ERROR_TYPE_DEBUG,
1394        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1395        GNUNET_i2s (&socket->other_peer));
1396   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1397 }
1398
1399
1400 /**
1401  * Callback to set state to CLOSED
1402  *
1403  * @param cls the closure from queue_message
1404  * @param socket the socket requiring state change
1405  */
1406 static void
1407 set_state_closed (void *cls,
1408                   struct GNUNET_STREAM_Socket *socket)
1409 {
1410   socket->state = STATE_CLOSED;
1411 }
1412
1413
1414 /**
1415  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1416  * socket
1417  *
1418  * @param socket the socket for which this HelloAckMessage has to be generated
1419  * @return the HelloAckMessage
1420  */
1421 static struct GNUNET_STREAM_HelloAckMessage *
1422 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1423 {
1424   struct GNUNET_STREAM_HelloAckMessage *msg;
1425
1426   /* Get the random sequence number */
1427   if (GNUNET_YES == socket->testing_active)
1428     socket->write_sequence_number =
1429       socket->testing_set_write_sequence_number_value;
1430   else
1431     socket->write_sequence_number = 
1432       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1433   LOG (GNUNET_ERROR_TYPE_DEBUG,
1434        "%s: write sequence number %u\n",
1435        GNUNET_i2s (&socket->other_peer),
1436        (unsigned int) socket->write_sequence_number);
1437   
1438   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1439   msg->header.header.size = 
1440     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1441   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1442   msg->sequence_number = htonl (socket->write_sequence_number);
1443   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1444
1445   return msg;
1446 }
1447
1448
1449 /**
1450  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1451  *
1452  * @param cls the socket (set from GNUNET_MESH_connect)
1453  * @param tunnel connection to the other end
1454  * @param tunnel_ctx this is NULL
1455  * @param sender who sent the message
1456  * @param message the actual message
1457  * @param atsi performance data for the connection
1458  * @return GNUNET_OK to keep the connection open,
1459  *         GNUNET_SYSERR to close it (signal serious error)
1460  */
1461 static int
1462 client_handle_hello_ack (void *cls,
1463                          struct GNUNET_MESH_Tunnel *tunnel,
1464                          void **tunnel_ctx,
1465                          const struct GNUNET_PeerIdentity *sender,
1466                          const struct GNUNET_MessageHeader *message,
1467                          const struct GNUNET_ATS_Information*atsi)
1468 {
1469   struct GNUNET_STREAM_Socket *socket = cls;
1470   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1471   struct GNUNET_STREAM_HelloAckMessage *reply;
1472
1473   if (0 != memcmp (sender,
1474                    &socket->other_peer,
1475                    sizeof (struct GNUNET_PeerIdentity)))
1476   {
1477     LOG (GNUNET_ERROR_TYPE_DEBUG,
1478          "%s: Received HELLO_ACK from non-confirming peer\n",
1479          GNUNET_i2s (&socket->other_peer));
1480     return GNUNET_YES;
1481   }
1482   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1483   LOG (GNUNET_ERROR_TYPE_DEBUG,
1484        "%s: Received HELLO_ACK from %s\n",
1485        GNUNET_i2s (&socket->other_peer),
1486        GNUNET_i2s (&socket->other_peer));
1487
1488   GNUNET_assert (socket->tunnel == tunnel);
1489   switch (socket->state)
1490   {
1491   case STATE_HELLO_WAIT:
1492     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1493     LOG (GNUNET_ERROR_TYPE_DEBUG,
1494          "%s: Read sequence number %u\n",
1495          GNUNET_i2s (&socket->other_peer),
1496          (unsigned int) socket->read_sequence_number);
1497     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1498     reply = generate_hello_ack_msg (socket);
1499     queue_message (socket,
1500                    &reply->header, 
1501                    &set_state_established, 
1502                    NULL);      
1503     return GNUNET_OK;
1504   case STATE_ESTABLISHED:
1505   case STATE_RECEIVE_CLOSE_WAIT:
1506     // call statistics (# ACKs ignored++)
1507     return GNUNET_OK;
1508   case STATE_INIT:
1509   default:
1510     LOG (GNUNET_ERROR_TYPE_DEBUG,
1511          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1512          GNUNET_i2s (&socket->other_peer),
1513          GNUNET_i2s (&socket->other_peer),
1514          socket->state);
1515     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1516     return GNUNET_SYSERR;
1517   }
1518
1519 }
1520
1521
1522 /**
1523  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1524  *
1525  * @param cls the socket (set from GNUNET_MESH_connect)
1526  * @param tunnel connection to the other end
1527  * @param tunnel_ctx this is NULL
1528  * @param sender who sent the message
1529  * @param message the actual message
1530  * @param atsi performance data for the connection
1531  * @return GNUNET_OK to keep the connection open,
1532  *         GNUNET_SYSERR to close it (signal serious error)
1533  */
1534 static int
1535 client_handle_reset (void *cls,
1536                      struct GNUNET_MESH_Tunnel *tunnel,
1537                      void **tunnel_ctx,
1538                      const struct GNUNET_PeerIdentity *sender,
1539                      const struct GNUNET_MessageHeader *message,
1540                      const struct GNUNET_ATS_Information*atsi)
1541 {
1542   // struct GNUNET_STREAM_Socket *socket = cls;
1543
1544   return GNUNET_OK;
1545 }
1546
1547
1548 /**
1549  * Common message handler for handling TRANSMIT_CLOSE messages
1550  *
1551  * @param socket the socket through which the ack was received
1552  * @param tunnel connection to the other end
1553  * @param sender who sent the message
1554  * @param msg the transmit close message
1555  * @param atsi performance data for the connection
1556  * @return GNUNET_OK to keep the connection open,
1557  *         GNUNET_SYSERR to close it (signal serious error)
1558  */
1559 static int
1560 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1561                        struct GNUNET_MESH_Tunnel *tunnel,
1562                        const struct GNUNET_PeerIdentity *sender,
1563                        const struct GNUNET_STREAM_MessageHeader *msg,
1564                        const struct GNUNET_ATS_Information*atsi)
1565 {
1566   struct GNUNET_STREAM_MessageHeader *reply;
1567
1568   switch (socket->state)
1569   {
1570   case STATE_ESTABLISHED:
1571     socket->state = STATE_RECEIVE_CLOSED;
1572
1573     /* Send TRANSMIT_CLOSE_ACK */
1574     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1575     reply->header.type = 
1576       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1577     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1578     queue_message (socket, reply, NULL, NULL);
1579     break;
1580
1581   default:
1582     /* FIXME: Call statistics? */
1583     break;
1584   }
1585   return GNUNET_YES;
1586 }
1587
1588
1589 /**
1590  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1591  *
1592  * @param cls the socket (set from GNUNET_MESH_connect)
1593  * @param tunnel connection to the other end
1594  * @param tunnel_ctx this is NULL
1595  * @param sender who sent the message
1596  * @param message the actual message
1597  * @param atsi performance data for the connection
1598  * @return GNUNET_OK to keep the connection open,
1599  *         GNUNET_SYSERR to close it (signal serious error)
1600  */
1601 static int
1602 client_handle_transmit_close (void *cls,
1603                               struct GNUNET_MESH_Tunnel *tunnel,
1604                               void **tunnel_ctx,
1605                               const struct GNUNET_PeerIdentity *sender,
1606                               const struct GNUNET_MessageHeader *message,
1607                               const struct GNUNET_ATS_Information*atsi)
1608 {
1609   struct GNUNET_STREAM_Socket *socket = cls;
1610   
1611   return handle_transmit_close (socket,
1612                                 tunnel,
1613                                 sender,
1614                                 (struct GNUNET_STREAM_MessageHeader *)message,
1615                                 atsi);
1616 }
1617
1618
1619 /**
1620  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1621  *
1622  * @param socket the socket
1623  * @param tunnel connection to the other end
1624  * @param sender who sent the message
1625  * @param message the actual message
1626  * @param atsi performance data for the connection
1627  * @param operation the close operation which is being ACK'ed
1628  * @return GNUNET_OK to keep the connection open,
1629  *         GNUNET_SYSERR to close it (signal serious error)
1630  */
1631 static int
1632 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1633                           struct GNUNET_MESH_Tunnel *tunnel,
1634                           const struct GNUNET_PeerIdentity *sender,
1635                           const struct GNUNET_STREAM_MessageHeader *message,
1636                           const struct GNUNET_ATS_Information *atsi,
1637                           int operation)
1638 {
1639   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1640
1641   shutdown_handle = socket->shutdown_handle;
1642   if (NULL == shutdown_handle)
1643   {
1644     LOG (GNUNET_ERROR_TYPE_DEBUG,
1645          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1646          GNUNET_i2s (&socket->other_peer));
1647     return GNUNET_OK;
1648   }
1649
1650   switch (operation)
1651   {
1652   case SHUT_RDWR:
1653     switch (socket->state)
1654     {
1655     case STATE_CLOSE_WAIT:
1656       if (SHUT_RDWR != shutdown_handle->operation)
1657       {
1658         LOG (GNUNET_ERROR_TYPE_DEBUG,
1659              "%s: Received CLOSE_ACK when shutdown handle is not for "
1660              "SHUT_RDWR\n",
1661              GNUNET_i2s (&socket->other_peer));
1662         return GNUNET_OK;
1663       }
1664
1665       LOG (GNUNET_ERROR_TYPE_DEBUG,
1666            "%s: Received CLOSE_ACK from %s\n",
1667            GNUNET_i2s (&socket->other_peer),
1668            GNUNET_i2s (&socket->other_peer));
1669       socket->state = STATE_CLOSED;
1670       break;
1671     default:
1672       LOG (GNUNET_ERROR_TYPE_DEBUG,
1673            "%s: Received CLOSE_ACK when in it not expected\n",
1674            GNUNET_i2s (&socket->other_peer));
1675       return GNUNET_OK;
1676     }
1677     break;
1678
1679   case SHUT_RD:
1680     switch (socket->state)
1681     {
1682     case STATE_RECEIVE_CLOSE_WAIT:
1683       if (SHUT_RD != shutdown_handle->operation)
1684       {
1685         LOG (GNUNET_ERROR_TYPE_DEBUG,
1686              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1687              "is not for SHUT_RD\n",
1688              GNUNET_i2s (&socket->other_peer));
1689         return GNUNET_OK;
1690       }
1691
1692       LOG (GNUNET_ERROR_TYPE_DEBUG,
1693            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1694            GNUNET_i2s (&socket->other_peer),
1695            GNUNET_i2s (&socket->other_peer));
1696       socket->state = STATE_RECEIVE_CLOSED;
1697       break;
1698     default:
1699       LOG (GNUNET_ERROR_TYPE_DEBUG,
1700            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1701            GNUNET_i2s (&socket->other_peer));
1702       return GNUNET_OK;
1703     }
1704
1705     break;
1706   case SHUT_WR:
1707     switch (socket->state)
1708     {
1709     case STATE_TRANSMIT_CLOSE_WAIT:
1710       if (SHUT_WR != shutdown_handle->operation)
1711       {
1712         LOG (GNUNET_ERROR_TYPE_DEBUG,
1713              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1714              "is not for SHUT_WR\n",
1715              GNUNET_i2s (&socket->other_peer));
1716         return GNUNET_OK;
1717       }
1718
1719       LOG (GNUNET_ERROR_TYPE_DEBUG,
1720            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1721            GNUNET_i2s (&socket->other_peer),
1722            GNUNET_i2s (&socket->other_peer));
1723       socket->state = STATE_TRANSMIT_CLOSED;
1724       break;
1725     default:
1726       LOG (GNUNET_ERROR_TYPE_DEBUG,
1727            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1728            GNUNET_i2s (&socket->other_peer));
1729           
1730       return GNUNET_OK;
1731     }
1732     break;
1733   default:
1734     GNUNET_assert (0);
1735   }
1736
1737   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1738     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1739                                    operation);
1740   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1741   socket->shutdown_handle = NULL;
1742   if (GNUNET_SCHEDULER_NO_TASK
1743       != shutdown_handle->close_msg_retransmission_task_id)
1744   {
1745     GNUNET_SCHEDULER_cancel
1746       (shutdown_handle->close_msg_retransmission_task_id);
1747     shutdown_handle->close_msg_retransmission_task_id =
1748       GNUNET_SCHEDULER_NO_TASK;
1749   }
1750   return GNUNET_OK;
1751 }
1752
1753
1754 /**
1755  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1756  *
1757  * @param cls the socket (set from GNUNET_MESH_connect)
1758  * @param tunnel connection to the other end
1759  * @param tunnel_ctx this is NULL
1760  * @param sender who sent the message
1761  * @param message the actual message
1762  * @param atsi performance data for the connection
1763  * @return GNUNET_OK to keep the connection open,
1764  *         GNUNET_SYSERR to close it (signal serious error)
1765  */
1766 static int
1767 client_handle_transmit_close_ack (void *cls,
1768                                   struct GNUNET_MESH_Tunnel *tunnel,
1769                                   void **tunnel_ctx,
1770                                   const struct GNUNET_PeerIdentity *sender,
1771                                   const struct GNUNET_MessageHeader *message,
1772                                   const struct GNUNET_ATS_Information*atsi)
1773 {
1774   struct GNUNET_STREAM_Socket *socket = cls;
1775
1776   return handle_generic_close_ack (socket,
1777                                    tunnel,
1778                                    sender,
1779                                    (const struct GNUNET_STREAM_MessageHeader *)
1780                                    message,
1781                                    atsi,
1782                                    SHUT_WR);
1783 }
1784
1785
1786 /**
1787  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1788  *
1789  * @param socket the socket
1790  * @param tunnel connection to the other end
1791  * @param sender who sent the message
1792  * @param message the actual message
1793  * @param atsi performance data for the connection
1794  * @return GNUNET_OK to keep the connection open,
1795  *         GNUNET_SYSERR to close it (signal serious error)
1796  */
1797 static int
1798 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1799                       struct GNUNET_MESH_Tunnel *tunnel,
1800                       const struct GNUNET_PeerIdentity *sender,
1801                       const struct GNUNET_STREAM_MessageHeader *message,
1802                       const struct GNUNET_ATS_Information *atsi)
1803 {
1804   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1805
1806   switch (socket->state)
1807   {
1808   case STATE_INIT:
1809   case STATE_LISTEN:
1810   case STATE_HELLO_WAIT:
1811     LOG (GNUNET_ERROR_TYPE_DEBUG,
1812          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1813          GNUNET_i2s (&socket->other_peer));
1814     return GNUNET_OK;
1815   default:
1816     break;
1817   }
1818   
1819   LOG (GNUNET_ERROR_TYPE_DEBUG,
1820        "%s: Received RECEIVE_CLOSE from %s\n",
1821        GNUNET_i2s (&socket->other_peer),
1822        GNUNET_i2s (&socket->other_peer));
1823   receive_close_ack =
1824     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1825   receive_close_ack->header.size =
1826     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1827   receive_close_ack->header.type =
1828     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1829   queue_message (socket,
1830                  receive_close_ack,
1831                  &set_state_closed,
1832                  NULL);
1833   
1834   /* FIXME: Handle the case where write handle is present; the write operation
1835      should be deemed as finised and the write continuation callback
1836      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1837   return GNUNET_OK;
1838 }
1839
1840
1841 /**
1842  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1843  *
1844  * @param cls the socket (set from GNUNET_MESH_connect)
1845  * @param tunnel connection to the other end
1846  * @param tunnel_ctx this is NULL
1847  * @param sender who sent the message
1848  * @param message the actual message
1849  * @param atsi performance data for the connection
1850  * @return GNUNET_OK to keep the connection open,
1851  *         GNUNET_SYSERR to close it (signal serious error)
1852  */
1853 static int
1854 client_handle_receive_close (void *cls,
1855                              struct GNUNET_MESH_Tunnel *tunnel,
1856                              void **tunnel_ctx,
1857                              const struct GNUNET_PeerIdentity *sender,
1858                              const struct GNUNET_MessageHeader *message,
1859                              const struct GNUNET_ATS_Information*atsi)
1860 {
1861   struct GNUNET_STREAM_Socket *socket = cls;
1862
1863   return
1864     handle_receive_close (socket,
1865                           tunnel,
1866                           sender,
1867                           (const struct GNUNET_STREAM_MessageHeader *) message,
1868                           atsi);
1869 }
1870
1871
1872 /**
1873  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1874  *
1875  * @param cls the socket (set from GNUNET_MESH_connect)
1876  * @param tunnel connection to the other end
1877  * @param tunnel_ctx this is NULL
1878  * @param sender who sent the message
1879  * @param message the actual message
1880  * @param atsi performance data for the connection
1881  * @return GNUNET_OK to keep the connection open,
1882  *         GNUNET_SYSERR to close it (signal serious error)
1883  */
1884 static int
1885 client_handle_receive_close_ack (void *cls,
1886                                  struct GNUNET_MESH_Tunnel *tunnel,
1887                                  void **tunnel_ctx,
1888                                  const struct GNUNET_PeerIdentity *sender,
1889                                  const struct GNUNET_MessageHeader *message,
1890                                  const struct GNUNET_ATS_Information*atsi)
1891 {
1892   struct GNUNET_STREAM_Socket *socket = cls;
1893
1894   return handle_generic_close_ack (socket,
1895                                    tunnel,
1896                                    sender,
1897                                    (const struct GNUNET_STREAM_MessageHeader *)
1898                                    message,
1899                                    atsi,
1900                                    SHUT_RD);
1901 }
1902
1903
1904 /**
1905  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1906  *
1907  * @param socket the socket
1908  * @param tunnel connection to the other end
1909  * @param sender who sent the message
1910  * @param message the actual message
1911  * @param atsi performance data for the connection
1912  * @return GNUNET_OK to keep the connection open,
1913  *         GNUNET_SYSERR to close it (signal serious error)
1914  */
1915 static int
1916 handle_close (struct GNUNET_STREAM_Socket *socket,
1917               struct GNUNET_MESH_Tunnel *tunnel,
1918               const struct GNUNET_PeerIdentity *sender,
1919               const struct GNUNET_STREAM_MessageHeader *message,
1920               const struct GNUNET_ATS_Information*atsi)
1921 {
1922   struct GNUNET_STREAM_MessageHeader *close_ack;
1923
1924   switch (socket->state)
1925   {
1926   case STATE_INIT:
1927   case STATE_LISTEN:
1928   case STATE_HELLO_WAIT:
1929     LOG (GNUNET_ERROR_TYPE_DEBUG,
1930          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1931          GNUNET_i2s (&socket->other_peer));
1932     return GNUNET_OK;
1933   default:
1934     break;
1935   }
1936
1937   LOG (GNUNET_ERROR_TYPE_DEBUG,
1938        "%s: Received CLOSE from %s\n",
1939        GNUNET_i2s (&socket->other_peer),
1940        GNUNET_i2s (&socket->other_peer));
1941   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1942   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1943   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1944   queue_message (socket,
1945                  close_ack,
1946                  &set_state_closed,
1947                  NULL);
1948   if (socket->state == STATE_CLOSED)
1949     return GNUNET_OK;
1950
1951   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1952   socket->receive_buffer = NULL;
1953   socket->receive_buffer_size = 0;
1954   return GNUNET_OK;
1955 }
1956
1957
1958 /**
1959  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1960  *
1961  * @param cls the socket (set from GNUNET_MESH_connect)
1962  * @param tunnel connection to the other end
1963  * @param tunnel_ctx this is NULL
1964  * @param sender who sent the message
1965  * @param message the actual message
1966  * @param atsi performance data for the connection
1967  * @return GNUNET_OK to keep the connection open,
1968  *         GNUNET_SYSERR to close it (signal serious error)
1969  */
1970 static int
1971 client_handle_close (void *cls,
1972                      struct GNUNET_MESH_Tunnel *tunnel,
1973                      void **tunnel_ctx,
1974                      const struct GNUNET_PeerIdentity *sender,
1975                      const struct GNUNET_MessageHeader *message,
1976                      const struct GNUNET_ATS_Information*atsi)
1977 {
1978   struct GNUNET_STREAM_Socket *socket = cls;
1979
1980   return handle_close (socket,
1981                        tunnel,
1982                        sender,
1983                        (const struct GNUNET_STREAM_MessageHeader *) message,
1984                        atsi);
1985 }
1986
1987
1988 /**
1989  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1990  *
1991  * @param cls the socket (set from GNUNET_MESH_connect)
1992  * @param tunnel connection to the other end
1993  * @param tunnel_ctx this is NULL
1994  * @param sender who sent the message
1995  * @param message the actual message
1996  * @param atsi performance data for the connection
1997  * @return GNUNET_OK to keep the connection open,
1998  *         GNUNET_SYSERR to close it (signal serious error)
1999  */
2000 static int
2001 client_handle_close_ack (void *cls,
2002                          struct GNUNET_MESH_Tunnel *tunnel,
2003                          void **tunnel_ctx,
2004                          const struct GNUNET_PeerIdentity *sender,
2005                          const struct GNUNET_MessageHeader *message,
2006                          const struct GNUNET_ATS_Information *atsi)
2007 {
2008   struct GNUNET_STREAM_Socket *socket = cls;
2009
2010   return handle_generic_close_ack (socket,
2011                                    tunnel,
2012                                    sender,
2013                                    (const struct GNUNET_STREAM_MessageHeader *) 
2014                                    message,
2015                                    atsi,
2016                                    SHUT_RDWR);
2017 }
2018
2019 /*****************************/
2020 /* Server's Message Handlers */
2021 /*****************************/
2022
2023 /**
2024  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2025  *
2026  * @param cls the closure
2027  * @param tunnel connection to the other end
2028  * @param tunnel_ctx the socket
2029  * @param sender who sent the message
2030  * @param message the actual message
2031  * @param atsi performance data for the connection
2032  * @return GNUNET_OK to keep the connection open,
2033  *         GNUNET_SYSERR to close it (signal serious error)
2034  */
2035 static int
2036 server_handle_data (void *cls,
2037                     struct GNUNET_MESH_Tunnel *tunnel,
2038                     void **tunnel_ctx,
2039                     const struct GNUNET_PeerIdentity *sender,
2040                     const struct GNUNET_MessageHeader *message,
2041                     const struct GNUNET_ATS_Information*atsi)
2042 {
2043   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2044
2045   return handle_data (socket,
2046                       tunnel,
2047                       sender,
2048                       (const struct GNUNET_STREAM_DataMessage *)message,
2049                       atsi);
2050 }
2051
2052
2053 /**
2054  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2055  *
2056  * @param cls the closure
2057  * @param tunnel connection to the other end
2058  * @param tunnel_ctx the socket
2059  * @param sender who sent the message
2060  * @param message the actual message
2061  * @param atsi performance data for the connection
2062  * @return GNUNET_OK to keep the connection open,
2063  *         GNUNET_SYSERR to close it (signal serious error)
2064  */
2065 static int
2066 server_handle_hello (void *cls,
2067                      struct GNUNET_MESH_Tunnel *tunnel,
2068                      void **tunnel_ctx,
2069                      const struct GNUNET_PeerIdentity *sender,
2070                      const struct GNUNET_MessageHeader *message,
2071                      const struct GNUNET_ATS_Information*atsi)
2072 {
2073   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2074   struct GNUNET_STREAM_HelloAckMessage *reply;
2075
2076   if (0 != memcmp (sender,
2077                    &socket->other_peer,
2078                    sizeof (struct GNUNET_PeerIdentity)))
2079   {
2080     LOG (GNUNET_ERROR_TYPE_DEBUG,
2081          "%s: Received HELLO from non-confirming peer\n",
2082          GNUNET_i2s (&socket->other_peer));
2083     return GNUNET_YES;
2084   }
2085
2086   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2087                  ntohs (message->type));
2088   GNUNET_assert (socket->tunnel == tunnel);
2089   LOG (GNUNET_ERROR_TYPE_DEBUG,
2090        "%s: Received HELLO from %s\n", 
2091        GNUNET_i2s (&socket->other_peer),
2092        GNUNET_i2s (&socket->other_peer));
2093
2094   if (STATE_INIT == socket->state)
2095   {
2096     reply = generate_hello_ack_msg (socket);
2097     queue_message (socket, 
2098                    &reply->header,
2099                    &set_state_hello_wait, 
2100                    NULL);
2101   }
2102   else
2103   {
2104     LOG (GNUNET_ERROR_TYPE_DEBUG,
2105          "%s: Client sent HELLO when in state %d\n", 
2106          GNUNET_i2s (&socket->other_peer),
2107          socket->state);
2108     /* FIXME: Send RESET? */
2109       
2110   }
2111   return GNUNET_OK;
2112 }
2113
2114
2115 /**
2116  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2117  *
2118  * @param cls the closure
2119  * @param tunnel connection to the other end
2120  * @param tunnel_ctx the socket
2121  * @param sender who sent the message
2122  * @param message the actual message
2123  * @param atsi performance data for the connection
2124  * @return GNUNET_OK to keep the connection open,
2125  *         GNUNET_SYSERR to close it (signal serious error)
2126  */
2127 static int
2128 server_handle_hello_ack (void *cls,
2129                          struct GNUNET_MESH_Tunnel *tunnel,
2130                          void **tunnel_ctx,
2131                          const struct GNUNET_PeerIdentity *sender,
2132                          const struct GNUNET_MessageHeader *message,
2133                          const struct GNUNET_ATS_Information*atsi)
2134 {
2135   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2136   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2137
2138   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2139                  ntohs (message->type));
2140   GNUNET_assert (socket->tunnel == tunnel);
2141   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2142   if (STATE_HELLO_WAIT == socket->state)
2143   {
2144     LOG (GNUNET_ERROR_TYPE_DEBUG,
2145          "%s: Received HELLO_ACK from %s\n",
2146          GNUNET_i2s (&socket->other_peer),
2147          GNUNET_i2s (&socket->other_peer));
2148     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2149     LOG (GNUNET_ERROR_TYPE_DEBUG,
2150          "%s: Read sequence number %u\n",
2151          GNUNET_i2s (&socket->other_peer),
2152          (unsigned int) socket->read_sequence_number);
2153     socket->receiver_window_available = 
2154       ntohl (ack_message->receiver_window_size);
2155     /* Attain ESTABLISHED state */
2156     set_state_established (NULL, socket);
2157   }
2158   else
2159   {
2160     LOG (GNUNET_ERROR_TYPE_DEBUG,
2161          "Client sent HELLO_ACK when in state %d\n", socket->state);
2162     /* FIXME: Send RESET? */
2163       
2164   }
2165   return GNUNET_OK;
2166 }
2167
2168
2169 /**
2170  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2171  *
2172  * @param cls the closure
2173  * @param tunnel connection to the other end
2174  * @param tunnel_ctx the socket
2175  * @param sender who sent the message
2176  * @param message the actual message
2177  * @param atsi performance data for the connection
2178  * @return GNUNET_OK to keep the connection open,
2179  *         GNUNET_SYSERR to close it (signal serious error)
2180  */
2181 static int
2182 server_handle_reset (void *cls,
2183                      struct GNUNET_MESH_Tunnel *tunnel,
2184                      void **tunnel_ctx,
2185                      const struct GNUNET_PeerIdentity *sender,
2186                      const struct GNUNET_MessageHeader *message,
2187                      const struct GNUNET_ATS_Information*atsi)
2188 {
2189   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2190
2191   return GNUNET_OK;
2192 }
2193
2194
2195 /**
2196  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2197  *
2198  * @param cls the closure
2199  * @param tunnel connection to the other end
2200  * @param tunnel_ctx the socket
2201  * @param sender who sent the message
2202  * @param message the actual message
2203  * @param atsi performance data for the connection
2204  * @return GNUNET_OK to keep the connection open,
2205  *         GNUNET_SYSERR to close it (signal serious error)
2206  */
2207 static int
2208 server_handle_transmit_close (void *cls,
2209                               struct GNUNET_MESH_Tunnel *tunnel,
2210                               void **tunnel_ctx,
2211                               const struct GNUNET_PeerIdentity *sender,
2212                               const struct GNUNET_MessageHeader *message,
2213                               const struct GNUNET_ATS_Information*atsi)
2214 {
2215   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2216
2217   return handle_transmit_close (socket,
2218                                 tunnel,
2219                                 sender,
2220                                 (struct GNUNET_STREAM_MessageHeader *)message,
2221                                 atsi);
2222 }
2223
2224
2225 /**
2226  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2227  *
2228  * @param cls the closure
2229  * @param tunnel connection to the other end
2230  * @param tunnel_ctx the socket
2231  * @param sender who sent the message
2232  * @param message the actual message
2233  * @param atsi performance data for the connection
2234  * @return GNUNET_OK to keep the connection open,
2235  *         GNUNET_SYSERR to close it (signal serious error)
2236  */
2237 static int
2238 server_handle_transmit_close_ack (void *cls,
2239                                   struct GNUNET_MESH_Tunnel *tunnel,
2240                                   void **tunnel_ctx,
2241                                   const struct GNUNET_PeerIdentity *sender,
2242                                   const struct GNUNET_MessageHeader *message,
2243                                   const struct GNUNET_ATS_Information*atsi)
2244 {
2245   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2246
2247   return handle_generic_close_ack (socket,
2248                                    tunnel,
2249                                    sender,
2250                                    (const struct GNUNET_STREAM_MessageHeader *)
2251                                    message,
2252                                    atsi,
2253                                    SHUT_WR);
2254 }
2255
2256
2257 /**
2258  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2259  *
2260  * @param cls the closure
2261  * @param tunnel connection to the other end
2262  * @param tunnel_ctx the socket
2263  * @param sender who sent the message
2264  * @param message the actual message
2265  * @param atsi performance data for the connection
2266  * @return GNUNET_OK to keep the connection open,
2267  *         GNUNET_SYSERR to close it (signal serious error)
2268  */
2269 static int
2270 server_handle_receive_close (void *cls,
2271                              struct GNUNET_MESH_Tunnel *tunnel,
2272                              void **tunnel_ctx,
2273                              const struct GNUNET_PeerIdentity *sender,
2274                              const struct GNUNET_MessageHeader *message,
2275                              const struct GNUNET_ATS_Information*atsi)
2276 {
2277   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2278
2279   return
2280     handle_receive_close (socket,
2281                           tunnel,
2282                           sender,
2283                           (const struct GNUNET_STREAM_MessageHeader *) message,
2284                           atsi);
2285 }
2286
2287
2288 /**
2289  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2290  *
2291  * @param cls the closure
2292  * @param tunnel connection to the other end
2293  * @param tunnel_ctx the socket
2294  * @param sender who sent the message
2295  * @param message the actual message
2296  * @param atsi performance data for the connection
2297  * @return GNUNET_OK to keep the connection open,
2298  *         GNUNET_SYSERR to close it (signal serious error)
2299  */
2300 static int
2301 server_handle_receive_close_ack (void *cls,
2302                                  struct GNUNET_MESH_Tunnel *tunnel,
2303                                  void **tunnel_ctx,
2304                                  const struct GNUNET_PeerIdentity *sender,
2305                                  const struct GNUNET_MessageHeader *message,
2306                                  const struct GNUNET_ATS_Information*atsi)
2307 {
2308   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2309
2310   return handle_generic_close_ack (socket,
2311                                    tunnel,
2312                                    sender,
2313                                    (const struct GNUNET_STREAM_MessageHeader *)
2314                                    message,
2315                                    atsi,
2316                                    SHUT_RD);
2317 }
2318
2319
2320 /**
2321  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2322  *
2323  * @param cls the listen socket (from GNUNET_MESH_connect in
2324  *          GNUNET_STREAM_listen) 
2325  * @param tunnel connection to the other end
2326  * @param tunnel_ctx the socket
2327  * @param sender who sent the message
2328  * @param message the actual message
2329  * @param atsi performance data for the connection
2330  * @return GNUNET_OK to keep the connection open,
2331  *         GNUNET_SYSERR to close it (signal serious error)
2332  */
2333 static int
2334 server_handle_close (void *cls,
2335                      struct GNUNET_MESH_Tunnel *tunnel,
2336                      void **tunnel_ctx,
2337                      const struct GNUNET_PeerIdentity *sender,
2338                      const struct GNUNET_MessageHeader *message,
2339                      const struct GNUNET_ATS_Information*atsi)
2340 {
2341   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2342   
2343   return handle_close (socket,
2344                        tunnel,
2345                        sender,
2346                        (const struct GNUNET_STREAM_MessageHeader *) message,
2347                        atsi);
2348 }
2349
2350
2351 /**
2352  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2353  *
2354  * @param cls the closure
2355  * @param tunnel connection to the other end
2356  * @param tunnel_ctx the socket
2357  * @param sender who sent the message
2358  * @param message the actual message
2359  * @param atsi performance data for the connection
2360  * @return GNUNET_OK to keep the connection open,
2361  *         GNUNET_SYSERR to close it (signal serious error)
2362  */
2363 static int
2364 server_handle_close_ack (void *cls,
2365                          struct GNUNET_MESH_Tunnel *tunnel,
2366                          void **tunnel_ctx,
2367                          const struct GNUNET_PeerIdentity *sender,
2368                          const struct GNUNET_MessageHeader *message,
2369                          const struct GNUNET_ATS_Information*atsi)
2370 {
2371   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2372
2373   return handle_generic_close_ack (socket,
2374                                    tunnel,
2375                                    sender,
2376                                    (const struct GNUNET_STREAM_MessageHeader *) 
2377                                    message,
2378                                    atsi,
2379                                    SHUT_RDWR);
2380 }
2381
2382
2383 /**
2384  * Handler for DATA_ACK messages
2385  *
2386  * @param socket the socket through which the ack was received
2387  * @param tunnel connection to the other end
2388  * @param sender who sent the message
2389  * @param ack the acknowledgment message
2390  * @param atsi performance data for the connection
2391  * @return GNUNET_OK to keep the connection open,
2392  *         GNUNET_SYSERR to close it (signal serious error)
2393  */
2394 static int
2395 handle_ack (struct GNUNET_STREAM_Socket *socket,
2396             struct GNUNET_MESH_Tunnel *tunnel,
2397             const struct GNUNET_PeerIdentity *sender,
2398             const struct GNUNET_STREAM_AckMessage *ack,
2399             const struct GNUNET_ATS_Information*atsi)
2400 {
2401   unsigned int packet;
2402   int need_retransmission;
2403   uint32_t sequence_difference;
2404   
2405   if (0 != memcmp (sender,
2406                    &socket->other_peer,
2407                    sizeof (struct GNUNET_PeerIdentity)))
2408   {
2409     LOG (GNUNET_ERROR_TYPE_DEBUG,
2410          "%s: Received ACK from non-confirming peer\n",
2411          GNUNET_i2s (&socket->other_peer));
2412     return GNUNET_YES;
2413   }
2414   switch (socket->state)
2415   {
2416   case (STATE_ESTABLISHED):
2417   case (STATE_RECEIVE_CLOSED):
2418   case (STATE_RECEIVE_CLOSE_WAIT):
2419     if (NULL == socket->write_handle)
2420     {
2421       LOG (GNUNET_ERROR_TYPE_DEBUG,
2422            "%s: Received DATA_ACK when write_handle is NULL\n",
2423            GNUNET_i2s (&socket->other_peer));
2424       return GNUNET_OK;
2425     }
2426     /* FIXME: increment in the base sequence number is breaking current flow
2427      */
2428     if (!((socket->write_sequence_number 
2429            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2430     {
2431       LOG (GNUNET_ERROR_TYPE_DEBUG,
2432            "%s: Received DATA_ACK with unexpected base sequence number\n",
2433            GNUNET_i2s (&socket->other_peer));
2434       LOG (GNUNET_ERROR_TYPE_DEBUG,
2435            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2436            GNUNET_i2s (&socket->other_peer),
2437            socket->write_sequence_number,
2438            ntohl (ack->base_sequence_number));
2439       return GNUNET_OK;
2440     }
2441     /* FIXME: include the case when write_handle is cancelled - ignore the 
2442        acks */
2443     LOG (GNUNET_ERROR_TYPE_DEBUG,
2444          "%s: Received DATA_ACK from %s\n",
2445          GNUNET_i2s (&socket->other_peer),
2446          GNUNET_i2s (&socket->other_peer));
2447       
2448     /* Cancel the retransmission task */
2449     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2450     {
2451       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2452       socket->retransmission_timeout_task_id = 
2453         GNUNET_SCHEDULER_NO_TASK;
2454     }
2455     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2456     {
2457       if (NULL == socket->write_handle->messages[packet]) break;
2458       /* BS: Base sequence from ack; PS: sequence num of current packet */
2459       sequence_difference = ntohl (ack->base_sequence_number)
2460         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2461       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2462       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2463           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2464               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2465       {
2466         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2467                               GNUNET_YES);
2468         continue;
2469       }
2470       if (GNUNET_YES == 
2471           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2472                                 -sequence_difference))/*inversion as PS >= BS */
2473       {
2474         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2475                               GNUNET_YES);
2476       }
2477     }
2478     /* Update the receive window remaining
2479        FIXME : Should update with the value from a data ack with greater
2480        sequence number */
2481     socket->receiver_window_available = 
2482       ntohl (ack->receive_window_remaining);
2483     /* Check if we have received all acknowledgements */
2484     need_retransmission = GNUNET_NO;
2485     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2486     {
2487       if (NULL == socket->write_handle->messages[packet]) break;
2488       if (GNUNET_YES != ackbitmap_is_bit_set 
2489           (&socket->write_handle->ack_bitmap,packet))
2490       {
2491         need_retransmission = GNUNET_YES;
2492         break;
2493       }
2494     }
2495     if (GNUNET_YES == need_retransmission)
2496     {
2497       write_data (socket);
2498     }
2499     else      /* We have to call the write continuation callback now */
2500     {
2501       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2502       
2503       /* Free the packets */
2504       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2505       {
2506         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2507       }
2508       write_handle = socket->write_handle;
2509       socket->write_handle = NULL;
2510       if (NULL != write_handle->write_cont)
2511         write_handle->write_cont (write_handle->write_cont_cls,
2512                                   socket->status,
2513                                   write_handle->size);
2514       /* We are done with the write handle - Freeing it */
2515       GNUNET_free (write_handle);
2516       LOG (GNUNET_ERROR_TYPE_DEBUG,
2517            "%s: Write completion callback completed\n",
2518            GNUNET_i2s (&socket->other_peer));      
2519     }
2520     break;
2521   default:
2522     break;
2523   }
2524   return GNUNET_OK;
2525 }
2526
2527
2528 /**
2529  * Handler for DATA_ACK messages
2530  *
2531  * @param cls the 'struct GNUNET_STREAM_Socket'
2532  * @param tunnel connection to the other end
2533  * @param tunnel_ctx unused
2534  * @param sender who sent the message
2535  * @param message the actual message
2536  * @param atsi performance data for the connection
2537  * @return GNUNET_OK to keep the connection open,
2538  *         GNUNET_SYSERR to close it (signal serious error)
2539  */
2540 static int
2541 client_handle_ack (void *cls,
2542                    struct GNUNET_MESH_Tunnel *tunnel,
2543                    void **tunnel_ctx,
2544                    const struct GNUNET_PeerIdentity *sender,
2545                    const struct GNUNET_MessageHeader *message,
2546                    const struct GNUNET_ATS_Information*atsi)
2547 {
2548   struct GNUNET_STREAM_Socket *socket = cls;
2549   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2550  
2551   return handle_ack (socket, tunnel, sender, ack, atsi);
2552 }
2553
2554
2555 /**
2556  * Handler for DATA_ACK messages
2557  *
2558  * @param cls the server's listen socket
2559  * @param tunnel connection to the other end
2560  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2561  * @param sender who sent the message
2562  * @param message the actual message
2563  * @param atsi performance data for the connection
2564  * @return GNUNET_OK to keep the connection open,
2565  *         GNUNET_SYSERR to close it (signal serious error)
2566  */
2567 static int
2568 server_handle_ack (void *cls,
2569                    struct GNUNET_MESH_Tunnel *tunnel,
2570                    void **tunnel_ctx,
2571                    const struct GNUNET_PeerIdentity *sender,
2572                    const struct GNUNET_MessageHeader *message,
2573                    const struct GNUNET_ATS_Information*atsi)
2574 {
2575   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2576   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2577  
2578   return handle_ack (socket, tunnel, sender, ack, atsi);
2579 }
2580
2581
2582 /**
2583  * For client message handlers, the stream socket is in the
2584  * closure argument.
2585  */
2586 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2587   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2588   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2589    sizeof (struct GNUNET_STREAM_AckMessage) },
2590   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2591    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2592   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2593    sizeof (struct GNUNET_STREAM_MessageHeader)},
2594   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2595    sizeof (struct GNUNET_STREAM_MessageHeader)},
2596   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2597    sizeof (struct GNUNET_STREAM_MessageHeader)},
2598   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2599    sizeof (struct GNUNET_STREAM_MessageHeader)},
2600   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2601    sizeof (struct GNUNET_STREAM_MessageHeader)},
2602   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2603    sizeof (struct GNUNET_STREAM_MessageHeader)},
2604   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2605    sizeof (struct GNUNET_STREAM_MessageHeader)},
2606   {NULL, 0, 0}
2607 };
2608
2609
2610 /**
2611  * For server message handlers, the stream socket is in the
2612  * tunnel context, and the listen socket in the closure argument.
2613  */
2614 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2615   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2616   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2617    sizeof (struct GNUNET_STREAM_AckMessage) },
2618   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2619    sizeof (struct GNUNET_STREAM_MessageHeader)},
2620   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2621    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2622   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2623    sizeof (struct GNUNET_STREAM_MessageHeader)},
2624   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2625    sizeof (struct GNUNET_STREAM_MessageHeader)},
2626   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2627    sizeof (struct GNUNET_STREAM_MessageHeader)},
2628   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2629    sizeof (struct GNUNET_STREAM_MessageHeader)},
2630   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2631    sizeof (struct GNUNET_STREAM_MessageHeader)},
2632   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2633    sizeof (struct GNUNET_STREAM_MessageHeader)},
2634   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2635    sizeof (struct GNUNET_STREAM_MessageHeader)},
2636   {NULL, 0, 0}
2637 };
2638
2639
2640 /**
2641  * Function called when our target peer is connected to our tunnel
2642  *
2643  * @param cls the socket for which this tunnel is created
2644  * @param peer the peer identity of the target
2645  * @param atsi performance data for the connection
2646  */
2647 static void
2648 mesh_peer_connect_callback (void *cls,
2649                             const struct GNUNET_PeerIdentity *peer,
2650                             const struct GNUNET_ATS_Information * atsi)
2651 {
2652   struct GNUNET_STREAM_Socket *socket = cls;
2653   struct GNUNET_STREAM_MessageHeader *message;
2654   
2655   if (0 != memcmp (peer,
2656                    &socket->other_peer,
2657                    sizeof (struct GNUNET_PeerIdentity)))
2658   {
2659     LOG (GNUNET_ERROR_TYPE_DEBUG,
2660          "%s: A peer which is not our target has connected to our tunnel\n",
2661          GNUNET_i2s(peer));
2662     return;
2663   }
2664   
2665   LOG (GNUNET_ERROR_TYPE_DEBUG,
2666        "%s: Target peer %s connected\n",
2667        GNUNET_i2s (&socket->other_peer),
2668        GNUNET_i2s (&socket->other_peer));
2669   
2670   /* Set state to INIT */
2671   socket->state = STATE_INIT;
2672
2673   /* Send HELLO message */
2674   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2675   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2676   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2677   queue_message (socket,
2678                  message,
2679                  &set_state_hello_wait,
2680                  NULL);
2681
2682   /* Call open callback */
2683   if (NULL == socket->open_cb)
2684   {
2685     LOG (GNUNET_ERROR_TYPE_DEBUG,
2686          "STREAM_open callback is NULL\n");
2687   }
2688 }
2689
2690
2691 /**
2692  * Function called when our target peer is disconnected from our tunnel
2693  *
2694  * @param cls the socket associated which this tunnel
2695  * @param peer the peer identity of the target
2696  */
2697 static void
2698 mesh_peer_disconnect_callback (void *cls,
2699                                const struct GNUNET_PeerIdentity *peer)
2700 {
2701   struct GNUNET_STREAM_Socket *socket=cls;
2702   
2703   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2704   LOG (GNUNET_ERROR_TYPE_DEBUG,
2705        "%s: Other peer %s disconnected \n",
2706        GNUNET_i2s (&socket->other_peer),
2707        GNUNET_i2s (&socket->other_peer));
2708 }
2709
2710
2711 /**
2712  * Method called whenever a peer creates a tunnel to us
2713  *
2714  * @param cls closure
2715  * @param tunnel new handle to the tunnel
2716  * @param initiator peer that started the tunnel
2717  * @param atsi performance information for the tunnel
2718  * @return initial tunnel context for the tunnel
2719  *         (can be NULL -- that's not an error)
2720  */
2721 static void *
2722 new_tunnel_notify (void *cls,
2723                    struct GNUNET_MESH_Tunnel *tunnel,
2724                    const struct GNUNET_PeerIdentity *initiator,
2725                    const struct GNUNET_ATS_Information *atsi)
2726 {
2727   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2728   struct GNUNET_STREAM_Socket *socket;
2729
2730   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2731      from the same peer again until the socket is closed */
2732
2733   if (GNUNET_NO == lsocket->listening)
2734   {
2735     LOG (GNUNET_ERROR_TYPE_DEBUG,
2736          "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2737          GNUNET_i2s (&socket->other_peer),
2738          GNUNET_i2s (&socket->other_peer));
2739     GNUNET_MESH_tunnel_destroy (tunnel);
2740     return NULL;
2741   }
2742   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2743   socket->other_peer = *initiator;
2744   socket->tunnel = tunnel;
2745   socket->session_id = 0;       /* FIXME */
2746   socket->state = STATE_INIT;
2747   socket->lsocket = lsocket;
2748   socket->retransmit_timeout = lsocket->retransmit_timeout;
2749   socket->testing_active = lsocket->testing_active;
2750   socket->testing_set_write_sequence_number_value =
2751     lsocket->testing_set_write_sequence_number_value;
2752     
2753   LOG (GNUNET_ERROR_TYPE_DEBUG,
2754        "%s: Peer %s initiated tunnel to us\n", 
2755        GNUNET_i2s (&socket->other_peer),
2756        GNUNET_i2s (&socket->other_peer));
2757   
2758   /* FIXME: Copy MESH handle from lsocket to socket */
2759   
2760   return socket;
2761 }
2762
2763
2764 /**
2765  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2766  * any associated state.  This function is NOT called if the client has
2767  * explicitly asked for the tunnel to be destroyed using
2768  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2769  * the tunnel.
2770  *
2771  * @param cls closure (set from GNUNET_MESH_connect)
2772  * @param tunnel connection to the other end (henceforth invalid)
2773  * @param tunnel_ctx place where local state associated
2774  *                   with the tunnel is stored
2775  */
2776 static void 
2777 tunnel_cleaner (void *cls,
2778                 const struct GNUNET_MESH_Tunnel *tunnel,
2779                 void *tunnel_ctx)
2780 {
2781   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2782
2783   if (tunnel != socket->tunnel)
2784     return;
2785
2786   GNUNET_break_op(0);
2787   LOG (GNUNET_ERROR_TYPE_DEBUG,
2788        "%s: Peer %s has terminated connection abruptly\n",
2789        GNUNET_i2s (&socket->other_peer),
2790        GNUNET_i2s (&socket->other_peer));
2791
2792   socket->status = GNUNET_STREAM_SHUTDOWN;
2793
2794   /* Clear Transmit handles */
2795   if (NULL != socket->transmit_handle)
2796   {
2797     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2798     socket->transmit_handle = NULL;
2799   }
2800   if (NULL != socket->ack_transmit_handle)
2801   {
2802     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2803     GNUNET_free (socket->ack_msg);
2804     socket->ack_msg = NULL;
2805     socket->ack_transmit_handle = NULL;
2806   }
2807   /* Stop Tasks using socket->tunnel */
2808   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2809   {
2810     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2811     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2812   }
2813   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2814   {
2815     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2816     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2817   }
2818   /* FIXME: Cancel all other tasks using socket->tunnel */
2819   socket->tunnel = NULL;
2820 }
2821
2822
2823 /**
2824  * Callback to signal timeout on lockmanager lock acquire
2825  *
2826  * @param cls the ListenSocket
2827  * @param tc the scheduler task context
2828  */
2829 static void
2830 lockmanager_acquire_timeout (void *cls, 
2831                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2832 {
2833   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2834   GNUNET_STREAM_ListenCallback listen_cb;
2835   void *listen_cb_cls;
2836
2837   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2838   listen_cb = lsocket->listen_cb;
2839   listen_cb_cls = lsocket->listen_cb_cls;
2840   GNUNET_STREAM_listen_close (lsocket);
2841   if (NULL != listen_cb)
2842     listen_cb (listen_cb_cls, NULL, NULL);  
2843 }
2844
2845
2846 /**
2847  * Callback to notify us on the status changes on app_port lock
2848  *
2849  * @param cls the ListenSocket
2850  * @param domain the domain name of the lock
2851  * @param lock the app_port
2852  * @param status the current status of the lock
2853  */
2854 static void
2855 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2856                        enum GNUNET_LOCKMANAGER_Status status)
2857 {
2858   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2859
2860   GNUNET_assert (lock == (uint32_t) lsocket->port);
2861   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2862   {
2863     lsocket->listening = GNUNET_YES;
2864     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2865     {
2866       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2867       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2868     }
2869     if (NULL == lsocket->mesh)
2870     {
2871       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2872
2873       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2874                                            RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
2875                                            lsocket, /* Closure */
2876                                            &new_tunnel_notify,
2877                                            &tunnel_cleaner,
2878                                            server_message_handlers,
2879                                            ports);
2880       GNUNET_assert (NULL != lsocket->mesh);
2881       if (NULL != lsocket->listen_ok_cb)
2882       {
2883         (void) lsocket->listen_ok_cb ();
2884       }
2885     }
2886   }
2887   if (GNUNET_LOCKMANAGER_RELEASE == status)
2888     lsocket->listening = GNUNET_NO;
2889 }
2890
2891
2892 /*****************/
2893 /* API functions */
2894 /*****************/
2895
2896
2897 /**
2898  * Tries to open a stream to the target peer
2899  *
2900  * @param cfg configuration to use
2901  * @param target the target peer to which the stream has to be opened
2902  * @param app_port the application port number which uniquely identifies this
2903  *            stream
2904  * @param open_cb this function will be called after stream has be established 
2905  * @param open_cb_cls the closure for open_cb
2906  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2907  * @return if successful it returns the stream socket; NULL if stream cannot be
2908  *         opened 
2909  */
2910 struct GNUNET_STREAM_Socket *
2911 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2912                     const struct GNUNET_PeerIdentity *target,
2913                     GNUNET_MESH_ApplicationType app_port,
2914                     GNUNET_STREAM_OpenCallback open_cb,
2915                     void *open_cb_cls,
2916                     ...)
2917 {
2918   struct GNUNET_STREAM_Socket *socket;
2919   enum GNUNET_STREAM_Option option;
2920   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2921   va_list vargs;                /* Variable arguments */
2922
2923   LOG (GNUNET_ERROR_TYPE_DEBUG,
2924        "%s\n", __func__);
2925   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2926   socket->other_peer = *target;
2927   socket->open_cb = open_cb;
2928   socket->open_cls = open_cb_cls;
2929   /* Set defaults */
2930   socket->retransmit_timeout = 
2931     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2932   socket->testing_active = GNUNET_NO;
2933   va_start (vargs, open_cb_cls); /* Parse variable args */
2934   do {
2935     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2936     switch (option)
2937     {
2938     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2939       /* Expect struct GNUNET_TIME_Relative */
2940       socket->retransmit_timeout = va_arg (vargs,
2941                                            struct GNUNET_TIME_Relative);
2942       break;
2943     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2944       socket->testing_active = GNUNET_YES;
2945       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2946                                                                 uint32_t);
2947       break;
2948     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2949       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2950       break;
2951     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2952       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2953       break;
2954     case GNUNET_STREAM_OPTION_END:
2955       break;
2956     }
2957   } while (GNUNET_STREAM_OPTION_END != option);
2958   va_end (vargs);               /* End of variable args parsing */
2959   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2960                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
2961                                       socket, /* cls */
2962                                       NULL, /* No inbound tunnel handler */
2963                                       NULL, /* No in-tunnel cleaner */
2964                                       client_message_handlers,
2965                                       ports); /* We don't get inbound tunnels */
2966   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2967   {
2968     GNUNET_free (socket);
2969     return NULL;
2970   }
2971
2972   /* Now create the mesh tunnel to target */
2973   LOG (GNUNET_ERROR_TYPE_DEBUG,
2974        "Creating MESH Tunnel\n");
2975   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2976                                               NULL, /* Tunnel context */
2977                                               &mesh_peer_connect_callback,
2978                                               &mesh_peer_disconnect_callback,
2979                                               socket);
2980   GNUNET_assert (NULL != socket->tunnel);
2981   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2982                                         &socket->other_peer);
2983   
2984   LOG (GNUNET_ERROR_TYPE_DEBUG,
2985        "%s() END\n", __func__);
2986   return socket;
2987 }
2988
2989
2990 /**
2991  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2992  *
2993  * @param socket the stream socket
2994  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2995  * @param completion_cb the callback that will be called upon successful
2996  *          shutdown of given operation
2997  * @param completion_cls the closure for the completion callback
2998  * @return the shutdown handle
2999  */
3000 struct GNUNET_STREAM_ShutdownHandle *
3001 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3002                         int operation,
3003                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3004                         void *completion_cls)
3005 {
3006   struct GNUNET_STREAM_ShutdownHandle *handle;
3007   struct GNUNET_STREAM_MessageHeader *msg;
3008   
3009   GNUNET_assert (NULL == socket->shutdown_handle);
3010
3011   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3012   handle->socket = socket;
3013   handle->completion_cb = completion_cb;
3014   handle->completion_cls = completion_cls;
3015   socket->shutdown_handle = handle;
3016
3017   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3018   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3019   switch (operation)
3020   {
3021   case SHUT_RD:
3022     handle->operation = SHUT_RD;
3023     if (NULL != socket->read_handle)
3024       LOG (GNUNET_ERROR_TYPE_WARNING,
3025            "Existing read handle should be cancelled before shutting"
3026            " down reading\n");
3027     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3028     queue_message (socket,
3029                    msg,
3030                    &set_state_receive_close_wait,
3031                    NULL);
3032     break;
3033   case SHUT_WR:
3034     handle->operation = SHUT_WR;
3035     if (NULL != socket->write_handle)
3036       LOG (GNUNET_ERROR_TYPE_WARNING,
3037            "Existing write handle should be cancelled before shutting"
3038            " down writing\n");
3039     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3040     queue_message (socket,
3041                    msg,
3042                    &set_state_transmit_close_wait,
3043                    NULL);
3044     break;
3045   case SHUT_RDWR:
3046     handle->operation = SHUT_RDWR;
3047     if (NULL != socket->write_handle)
3048       LOG (GNUNET_ERROR_TYPE_WARNING,
3049            "Existing write handle should be cancelled before shutting"
3050            " down writing\n");
3051     if (NULL != socket->read_handle)
3052       LOG (GNUNET_ERROR_TYPE_WARNING,
3053            "Existing read handle should be cancelled before shutting"
3054            " down reading\n");
3055     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3056     queue_message (socket,
3057                    msg,
3058                    &set_state_close_wait,
3059                    NULL);
3060     break;
3061   default:
3062     LOG (GNUNET_ERROR_TYPE_WARNING,
3063          "GNUNET_STREAM_shutdown called with invalid value for "
3064          "parameter operation -- Ignoring\n");
3065     GNUNET_free (msg);
3066     GNUNET_free (handle);
3067     return NULL;
3068   }
3069   handle->close_msg_retransmission_task_id =
3070     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3071                                   &close_msg_retransmission_task,
3072                                   handle);
3073   return handle;
3074 }
3075
3076
3077 /**
3078  * Cancels a pending shutdown
3079  *
3080  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3081  */
3082 void
3083 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3084 {
3085   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3086     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3087   GNUNET_free (handle);
3088 }
3089
3090
3091 /**
3092  * Closes the stream
3093  *
3094  * @param socket the stream socket
3095  */
3096 void
3097 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3098 {
3099   struct MessageQueue *head;
3100
3101   if (NULL != socket->read_handle)
3102   {
3103     LOG (GNUNET_ERROR_TYPE_WARNING,
3104          "Closing STREAM socket when a read handle is pending\n");
3105   }
3106   if (NULL != socket->write_handle)
3107   {
3108     LOG (GNUNET_ERROR_TYPE_WARNING,
3109          "Closing STREAM socket when a write handle is pending\n");
3110     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3111     //socket->write_handle = NULL;
3112   }
3113
3114   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3115   {
3116     /* socket closed with read task pending!? */
3117     GNUNET_break (0);
3118     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3119     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3120   }
3121   
3122   /* Terminate the ack'ing tasks if they are still present */
3123   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3124   {
3125     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3126     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3127   }
3128
3129   /* Clear Transmit handles */
3130   if (NULL != socket->transmit_handle)
3131   {
3132     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3133     socket->transmit_handle = NULL;
3134   }
3135   if (NULL != socket->ack_transmit_handle)
3136   {
3137     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3138     GNUNET_free (socket->ack_msg);
3139     socket->ack_msg = NULL;
3140     socket->ack_transmit_handle = NULL;
3141   }
3142
3143   /* Clear existing message queue */
3144   while (NULL != (head = socket->queue_head)) {
3145     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3146                                  socket->queue_tail,
3147                                  head);
3148     GNUNET_free (head->message);
3149     GNUNET_free (head);
3150   }
3151
3152   /* Close associated tunnel */
3153   if (NULL != socket->tunnel)
3154   {
3155     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3156     socket->tunnel = NULL;
3157   }
3158
3159   /* Close mesh connection */
3160   if (NULL != socket->mesh && NULL == socket->lsocket)
3161   {
3162     GNUNET_MESH_disconnect (socket->mesh);
3163     socket->mesh = NULL;
3164   }
3165   
3166   /* Release receive buffer */
3167   if (NULL != socket->receive_buffer)
3168   {
3169     GNUNET_free (socket->receive_buffer);
3170   }
3171
3172   GNUNET_free (socket);
3173 }
3174
3175
3176 /**
3177  * Listens for stream connections for a specific application ports
3178  *
3179  * @param cfg the configuration to use
3180  * @param app_port the application port for which new streams will be accepted
3181  * @param listen_cb this function will be called when a peer tries to establish
3182  *            a stream with us
3183  * @param listen_cb_cls closure for listen_cb
3184  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3185  * @return listen socket, NULL for any error
3186  */
3187 struct GNUNET_STREAM_ListenSocket *
3188 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3189                       GNUNET_MESH_ApplicationType app_port,
3190                       GNUNET_STREAM_ListenCallback listen_cb,
3191                       void *listen_cb_cls,
3192                       ...)
3193 {
3194   /* FIXME: Add variable args for passing configration options? */
3195   struct GNUNET_STREAM_ListenSocket *lsocket;
3196   struct GNUNET_TIME_Relative listen_timeout;
3197   enum GNUNET_STREAM_Option option;
3198   va_list vargs;
3199
3200   GNUNET_assert (NULL != listen_cb);
3201   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3202   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3203   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3204   if (NULL == lsocket->lockmanager)
3205   {
3206     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3207     GNUNET_free (lsocket);
3208     return NULL;
3209   }
3210   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3211   /* Set defaults */
3212   lsocket->retransmit_timeout = 
3213     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3214   lsocket->testing_active = GNUNET_NO;
3215   lsocket->listen_ok_cb = NULL;
3216   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3217   va_start (vargs, listen_cb_cls);
3218   do {
3219     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3220     switch (option)
3221     {
3222     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3223       lsocket->retransmit_timeout = va_arg (vargs,
3224                                             struct GNUNET_TIME_Relative);
3225       break;
3226     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3227       lsocket->testing_active = GNUNET_YES;
3228       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3229                                                                  uint32_t);
3230       break;
3231     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3232       listen_timeout = GNUNET_TIME_relative_multiply
3233         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3234       break;
3235     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3236       lsocket->listen_ok_cb = va_arg (vargs,
3237                                       GNUNET_STREAM_ListenSuccessCallback);
3238       break;
3239     case GNUNET_STREAM_OPTION_END:
3240       break;
3241     }
3242   } while (GNUNET_STREAM_OPTION_END != option);
3243   va_end (vargs);
3244   lsocket->port = app_port;
3245   lsocket->listen_cb = listen_cb;
3246   lsocket->listen_cb_cls = listen_cb_cls;
3247   lsocket->locking_request = 
3248     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3249                                      (uint32_t) lsocket->port,
3250                                      &lock_status_change_cb, lsocket);
3251   lsocket->lockmanager_acquire_timeout_task =
3252     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3253                                   &lockmanager_acquire_timeout, lsocket);
3254   return lsocket;
3255 }
3256
3257
3258 /**
3259  * Closes the listen socket
3260  *
3261  * @param lsocket the listen socket
3262  */
3263 void
3264 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3265 {
3266   /* Close MESH connection */
3267   if (NULL != lsocket->mesh)
3268     GNUNET_MESH_disconnect (lsocket->mesh);
3269   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3270   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3271     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3272   if (NULL != lsocket->locking_request)
3273     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3274   if (NULL != lsocket->lockmanager)
3275     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3276   GNUNET_free (lsocket);
3277 }
3278
3279
3280 /**
3281  * Tries to write the given data to the stream. The maximum size of data that
3282  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3283  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3284  * violation, however only the said number of maximum bytes will be written.
3285  *
3286  * @param socket the socket representing a stream
3287  * @param data the data buffer from where the data is written into the stream
3288  * @param size the number of bytes to be written from the data buffer
3289  * @param timeout the timeout period
3290  * @param write_cont the function to call upon writing some bytes into the
3291  *          stream 
3292  * @param write_cont_cls the closure
3293  *
3294  * @return handle to cancel the operation; if a previous write is pending or
3295  *           the stream has been shutdown for this operation then write_cont is
3296  *           immediately called and NULL is returned.
3297  */
3298 struct GNUNET_STREAM_IOWriteHandle *
3299 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3300                      const void *data,
3301                      size_t size,
3302                      struct GNUNET_TIME_Relative timeout,
3303                      GNUNET_STREAM_CompletionContinuation write_cont,
3304                      void *write_cont_cls)
3305 {
3306   unsigned int num_needed_packets;
3307   unsigned int packet;
3308   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3309   uint32_t packet_size;
3310   uint32_t payload_size;
3311   struct GNUNET_STREAM_DataMessage *data_msg;
3312   const void *sweep;
3313   struct GNUNET_TIME_Relative ack_deadline;
3314
3315   LOG (GNUNET_ERROR_TYPE_DEBUG,
3316        "%s\n", __func__);
3317
3318   /* Return NULL if there is already a write request pending */
3319   if (NULL != socket->write_handle)
3320   {
3321     GNUNET_break (0);
3322     return NULL;
3323   }
3324
3325   switch (socket->state)
3326   {
3327   case STATE_TRANSMIT_CLOSED:
3328   case STATE_TRANSMIT_CLOSE_WAIT:
3329   case STATE_CLOSED:
3330   case STATE_CLOSE_WAIT:
3331     if (NULL != write_cont)
3332       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3333     LOG (GNUNET_ERROR_TYPE_DEBUG,
3334          "%s() END\n", __func__);
3335     return NULL;
3336   case STATE_INIT:
3337   case STATE_LISTEN:
3338   case STATE_HELLO_WAIT:
3339     if (NULL != write_cont)
3340       /* FIXME: GNUNET_STREAM_SYSERR?? */
3341       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3342     LOG (GNUNET_ERROR_TYPE_DEBUG,
3343          "%s() END\n", __func__);
3344     return NULL;
3345   case STATE_ESTABLISHED:
3346   case STATE_RECEIVE_CLOSED:
3347   case STATE_RECEIVE_CLOSE_WAIT:
3348     break;
3349   }
3350
3351   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3352     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3353   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3354   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3355   io_handle->socket = socket;
3356   io_handle->write_cont = write_cont;
3357   io_handle->write_cont_cls = write_cont_cls;
3358   io_handle->size = size;
3359   sweep = data;
3360   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3361      determined from RTT */
3362   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3363   /* Divide the given buffer into packets for sending */
3364   for (packet=0; packet < num_needed_packets; packet++)
3365   {
3366     if ((packet + 1) * max_payload_size < size) 
3367     {
3368       payload_size = max_payload_size;
3369       packet_size = MAX_PACKET_SIZE;
3370     }
3371     else 
3372     {
3373       payload_size = size - packet * max_payload_size;
3374       packet_size =  payload_size + sizeof (struct
3375                                             GNUNET_STREAM_DataMessage); 
3376     }
3377     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3378     io_handle->messages[packet]->header.header.size = htons (packet_size);
3379     io_handle->messages[packet]->header.header.type =
3380       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3381     io_handle->messages[packet]->sequence_number =
3382       htonl (socket->write_sequence_number++);
3383     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3384
3385     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3386        determined from RTT */
3387     io_handle->messages[packet]->ack_deadline =
3388       GNUNET_TIME_relative_hton (ack_deadline);
3389     data_msg = io_handle->messages[packet];
3390     /* Copy data from given buffer to the packet */
3391     memcpy (&data_msg[1],
3392             sweep,
3393             payload_size);
3394     sweep += payload_size;
3395     socket->write_offset += payload_size;
3396   }
3397   socket->write_handle = io_handle;
3398   write_data (socket);
3399
3400   LOG (GNUNET_ERROR_TYPE_DEBUG,
3401        "%s() END\n", __func__);
3402
3403   return io_handle;
3404 }
3405
3406
3407 /**
3408  * Tries to read data from the stream.
3409  *
3410  * @param socket the socket representing a stream
3411  * @param timeout the timeout period
3412  * @param proc function to call with data (once only)
3413  * @param proc_cls the closure for proc
3414  *
3415  * @return handle to cancel the operation; if the stream has been shutdown for
3416  *           this type of opeartion then the DataProcessor is immediately
3417  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3418  */
3419 struct GNUNET_STREAM_IOReadHandle *
3420 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3421                     struct GNUNET_TIME_Relative timeout,
3422                     GNUNET_STREAM_DataProcessor proc,
3423                     void *proc_cls)
3424 {
3425   struct GNUNET_STREAM_IOReadHandle *read_handle;
3426   
3427   LOG (GNUNET_ERROR_TYPE_DEBUG,
3428        "%s: %s()\n", 
3429        GNUNET_i2s (&socket->other_peer),
3430        __func__);
3431
3432   /* Return NULL if there is already a read handle; the user has to cancel that
3433      first before continuing or has to wait until it is completed */
3434   if (NULL != socket->read_handle) return NULL;
3435
3436   GNUNET_assert (NULL != proc);
3437
3438   switch (socket->state)
3439   {
3440   case STATE_RECEIVE_CLOSED:
3441   case STATE_RECEIVE_CLOSE_WAIT:
3442   case STATE_CLOSED:
3443   case STATE_CLOSE_WAIT:
3444     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3445     LOG (GNUNET_ERROR_TYPE_DEBUG,
3446          "%s: %s() END\n",
3447          GNUNET_i2s (&socket->other_peer),
3448          __func__);
3449     return NULL;
3450   default:
3451     break;
3452   }
3453
3454   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3455   read_handle->proc = proc;
3456   read_handle->proc_cls = proc_cls;
3457   socket->read_handle = read_handle;
3458
3459   /* Check if we have a packet at bitmap 0 */
3460   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3461                                           0))
3462   {
3463     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3464                                                      socket);
3465    
3466   }
3467   
3468   /* Setup the read timeout task */
3469   socket->read_io_timeout_task_id =
3470     GNUNET_SCHEDULER_add_delayed (timeout,
3471                                   &read_io_timeout,
3472                                   socket);
3473   LOG (GNUNET_ERROR_TYPE_DEBUG,
3474        "%s: %s() END\n",
3475        GNUNET_i2s (&socket->other_peer),
3476        __func__);
3477   return read_handle;
3478 }
3479
3480
3481 /**
3482  * Cancel pending write operation.
3483  *
3484  * @param ioh handle to operation to cancel
3485  */
3486 void
3487 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3488 {
3489   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3490   unsigned int packet;
3491
3492   GNUNET_assert (NULL != socket->write_handle);
3493   GNUNET_assert (socket->write_handle == ioh);
3494
3495   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3496   {
3497     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3498     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3499   }
3500
3501   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3502   {
3503     if (NULL == ioh->messages[packet]) break;
3504     GNUNET_free (ioh->messages[packet]);
3505   }
3506       
3507   GNUNET_free (socket->write_handle);
3508   socket->write_handle = NULL;
3509 }
3510
3511
3512 /**
3513  * Cancel pending read operation.
3514  *
3515  * @param ioh handle to operation to cancel
3516  */
3517 void
3518 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3519 {
3520   // FIXME: do stuff
3521 }
3522
3523 /* end of stream_api.c */