e27f4df1fab00f96498986d78084f25ed30286e0
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 #define LOG(kind,...)                                   \
46   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
47
48 #define TIME_REL_SECS(sec) \
49   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
50
51 /**
52  * The maximum packet size of a stream packet
53  */
54 #define MAX_PACKET_SIZE 512//64000
55
56 /**
57  * Receive buffer
58  */
59 #define RECEIVE_BUFFER_SIZE 4096000
60
61 /**
62  * The maximum payload a data message packet can carry
63  */
64 static const size_t max_payload_size = 
65   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
66
67 /**
68  * states in the Protocol
69  */
70 enum State
71   {
72     /**
73      * Client initialization state
74      */
75     STATE_INIT,
76
77     /**
78      * Listener initialization state 
79      */
80     STATE_LISTEN,
81
82     /**
83      * Pre-connection establishment state
84      */
85     STATE_HELLO_WAIT,
86
87     /**
88      * State where a connection has been established
89      */
90     STATE_ESTABLISHED,
91
92     /**
93      * State where the socket is closed on our side and waiting to be ACK'ed
94      */
95     STATE_RECEIVE_CLOSE_WAIT,
96
97     /**
98      * State where the socket is closed for reading
99      */
100     STATE_RECEIVE_CLOSED,
101
102     /**
103      * State where the socket is closed on our side and waiting to be ACK'ed
104      */
105     STATE_TRANSMIT_CLOSE_WAIT,
106
107     /**
108      * State where the socket is closed for writing
109      */
110     STATE_TRANSMIT_CLOSED,
111
112     /**
113      * State where the socket is closed on our side and waiting to be ACK'ed
114      */
115     STATE_CLOSE_WAIT,
116
117     /**
118      * State where the socket is closed
119      */
120     STATE_CLOSED 
121   };
122
123
124 /**
125  * Functions of this type are called when a message is written
126  *
127  * @param cls the closure from queue_message
128  * @param socket the socket the written message was bound to
129  */
130 typedef void (*SendFinishCallback) (void *cls,
131                                     struct GNUNET_STREAM_Socket *socket);
132
133
134 /**
135  * The send message queue
136  */
137 struct MessageQueue
138 {
139   /**
140    * The message
141    */
142   struct GNUNET_STREAM_MessageHeader *message;
143
144   /**
145    * Callback to be called when the message is sent
146    */
147   SendFinishCallback finish_cb;
148
149   /**
150    * The closure for finish_cb
151    */
152   void *finish_cb_cls;
153
154   /**
155    * The next message in queue. Should be NULL in the last message
156    */
157   struct MessageQueue *next;
158
159   /**
160    * The next message in queue. Should be NULL in the first message
161    */
162   struct MessageQueue *prev;
163 };
164
165
166 /**
167  * The STREAM Socket Handler
168  */
169 struct GNUNET_STREAM_Socket
170 {
171   /**
172    * Retransmission timeout
173    */
174   struct GNUNET_TIME_Relative retransmit_timeout;
175
176   /**
177    * The Acknowledgement Bitmap
178    */
179   GNUNET_STREAM_AckBitmap ack_bitmap;
180
181   /**
182    * Time when the Acknowledgement was queued
183    */
184   struct GNUNET_TIME_Absolute ack_time_registered;
185
186   /**
187    * Queued Acknowledgement deadline
188    */
189   struct GNUNET_TIME_Relative ack_time_deadline;
190
191   /**
192    * The mesh handle
193    */
194   struct GNUNET_MESH_Handle *mesh;
195
196   /**
197    * The mesh tunnel handle
198    */
199   struct GNUNET_MESH_Tunnel *tunnel;
200
201   /**
202    * Stream open closure
203    */
204   void *open_cls;
205
206   /**
207    * Stream open callback
208    */
209   GNUNET_STREAM_OpenCallback open_cb;
210
211   /**
212    * The current transmit handle (if a pending transmit request exists)
213    */
214   struct GNUNET_MESH_TransmitHandle *transmit_handle;
215
216   /**
217    * The current act transmit handle (if a pending ack transmit request exists)
218    */
219   struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
220
221   /**
222    * Pointer to the current ack message using in ack_task
223    */
224   struct GNUNET_STREAM_AckMessage *ack_msg;
225
226   /**
227    * The current message associated with the transmit handle
228    */
229   struct MessageQueue *queue_head;
230
231   /**
232    * The queue tail, should always point to the last message in queue
233    */
234   struct MessageQueue *queue_tail;
235
236   /**
237    * The write IO_handle associated with this socket
238    */
239   struct GNUNET_STREAM_IOWriteHandle *write_handle;
240
241   /**
242    * The read IO_handle associated with this socket
243    */
244   struct GNUNET_STREAM_IOReadHandle *read_handle;
245
246   /**
247    * The shutdown handle associated with this socket
248    */
249   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
250
251   /**
252    * Buffer for storing received messages
253    */
254   void *receive_buffer;
255
256   /**
257    * The listen socket from which this socket is derived. Should be NULL if it
258    * is not a derived socket
259    */
260   struct GNUNET_STREAM_ListenSocket *lsocket;
261
262   /**
263    * The peer identity of the peer at the other end of the stream
264    */
265   struct GNUNET_PeerIdentity other_peer;
266
267   /**
268    * Task identifier for the read io timeout task
269    */
270   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
271
272   /**
273    * Task identifier for retransmission task after timeout
274    */
275   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
276
277   /**
278    * The task for sending timely Acks
279    */
280   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
281
282   /**
283    * Task scheduled to continue a read operation.
284    */
285   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
286
287   /**
288    * The state of the protocol associated with this socket
289    */
290   enum State state;
291
292   /**
293    * The status of the socket
294    */
295   enum GNUNET_STREAM_Status status;
296
297   /**
298    * The number of previous timeouts; FIXME: currently not used
299    */
300   unsigned int retries;
301
302   /**
303    * The application port number (type: uint32_t)
304    */
305   GNUNET_MESH_ApplicationType app_port;
306
307   /**
308    * Whether testing mode is active or not
309    */
310   int testing_active;
311
312   /**
313    * The write sequence number to be set incase of testing
314    */
315   uint32_t testing_set_write_sequence_number_value;
316
317   /**
318    * The session id associated with this stream connection
319    * FIXME: Not used currently, may be removed
320    */
321   uint32_t session_id;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363 };
364
365
366 /**
367  * A socket for listening
368  */
369 struct GNUNET_STREAM_ListenSocket
370 {
371   /**
372    * The mesh handle
373    */
374   struct GNUNET_MESH_Handle *mesh;
375
376   /**
377    * Our configuration
378    */
379   struct GNUNET_CONFIGURATION_Handle *cfg;
380
381   /**
382    * Handle to the lock manager service
383    */
384   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
385
386   /**
387    * The active LockingRequest from lockmanager
388    */
389   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
390
391   /**
392    * The callback function which is called after successful opening socket
393    */
394   GNUNET_STREAM_ListenCallback listen_cb;
395
396   /**
397    * The call back closure
398    */
399   void *listen_cb_cls;
400
401   /**
402    * The service port
403    */
404   GNUNET_MESH_ApplicationType port;
405   
406   /**
407    * The id of the lockmanager timeout task
408    */
409   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
410
411   /**
412    * The retransmit timeout
413    */
414   struct GNUNET_TIME_Relative retransmit_timeout;
415   
416   /**
417    * Listen enabled?
418    */
419   int listening;
420
421   /**
422    * Whether testing mode is active or not
423    */
424   int testing_active;
425
426   /**
427    * The write sequence number to be set incase of testing
428    */
429   uint32_t testing_set_write_sequence_number_value;
430 };
431
432
433 /**
434  * The IO Write Handle
435  */
436 struct GNUNET_STREAM_IOWriteHandle
437 {
438   /**
439    * The socket to which this write handle is associated
440    */
441   struct GNUNET_STREAM_Socket *socket;
442
443   /**
444    * The packet_buffers associated with this Handle
445    */
446   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
447
448   /**
449    * The write continuation callback
450    */
451   GNUNET_STREAM_CompletionContinuation write_cont;
452
453   /**
454    * Write continuation closure
455    */
456   void *write_cont_cls;
457
458   /**
459    * The bitmap of this IOHandle; Corresponding bit for a message is set when
460    * it has been acknowledged by the receiver
461    */
462   GNUNET_STREAM_AckBitmap ack_bitmap;
463
464   /**
465    * Number of bytes in this write handle
466    */
467   size_t size;
468 };
469
470
471 /**
472  * The IO Read Handle
473  */
474 struct GNUNET_STREAM_IOReadHandle
475 {
476   /**
477    * Callback for the read processor
478    */
479   GNUNET_STREAM_DataProcessor proc;
480
481   /**
482    * The closure pointer for the read processor callback
483    */
484   void *proc_cls;
485 };
486
487
488 /**
489  * Handle for Shutdown
490  */
491 struct GNUNET_STREAM_ShutdownHandle
492 {
493   /**
494    * The socket associated with this shutdown handle
495    */
496   struct GNUNET_STREAM_Socket *socket;
497
498   /**
499    * Shutdown completion callback
500    */
501   GNUNET_STREAM_ShutdownCompletion completion_cb;
502
503   /**
504    * Closure for completion callback
505    */
506   void *completion_cls;
507
508   /**
509    * Close message retransmission task id
510    */
511   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
512
513   /**
514    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
515    */
516   int operation;  
517 };
518
519
520 /**
521  * Default value in seconds for various timeouts
522  */
523 static const unsigned int default_timeout = 10;
524
525 /**
526  * The domain name for locks we use here
527  */
528 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
529
530
531 /**
532  * Callback function for sending queued message
533  *
534  * @param cls closure the socket
535  * @param size number of bytes available in buf
536  * @param buf where the callee should write the message
537  * @return number of bytes written to buf
538  */
539 static size_t
540 send_message_notify (void *cls, size_t size, void *buf)
541 {
542   struct GNUNET_STREAM_Socket *socket = cls;
543   struct MessageQueue *head;
544   size_t ret;
545
546   socket->transmit_handle = NULL; /* Remove the transmit handle */
547   head = socket->queue_head;
548   if (NULL == head)
549     return 0; /* just to be safe */
550   if (0 == size)                /* request timed out */
551   {
552     socket->retries++;
553     LOG (GNUNET_ERROR_TYPE_DEBUG,
554          "%s: Message sending timed out. Retry %d \n",
555          GNUNET_i2s (&socket->other_peer),
556          socket->retries);
557     socket->transmit_handle = 
558       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
559                                          0, /* Corking */
560                                          1, /* Priority */
561                                          /* FIXME: exponential backoff */
562                                          socket->retransmit_timeout,
563                                          &socket->other_peer,
564                                          ntohs (head->message->header.size),
565                                          &send_message_notify,
566                                          socket);
567     return 0;
568   }
569
570   ret = ntohs (head->message->header.size);
571   GNUNET_assert (size >= ret);
572   memcpy (buf, head->message, ret);
573   if (NULL != head->finish_cb)
574   {
575     head->finish_cb (head->finish_cb_cls, socket);
576   }
577   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
578                                socket->queue_tail,
579                                head);
580   GNUNET_free (head->message);
581   GNUNET_free (head);
582   head = socket->queue_head;
583   if (NULL != head)    /* more pending messages to send */
584   {
585     socket->retries = 0;
586     socket->transmit_handle = 
587       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
588                                          0, /* Corking */
589                                          1, /* Priority */
590                                          /* FIXME: exponential backoff */
591                                          socket->retransmit_timeout,
592                                          &socket->other_peer,
593                                          ntohs (head->message->header.size),
594                                          &send_message_notify,
595                                          socket);
596   }
597   return ret;
598 }
599
600
601 /**
602  * Queues a message for sending using the mesh connection of a socket
603  *
604  * @param socket the socket whose mesh connection is used
605  * @param message the message to be sent
606  * @param finish_cb the callback to be called when the message is sent
607  * @param finish_cb_cls the closure for the callback
608  */
609 static void
610 queue_message (struct GNUNET_STREAM_Socket *socket,
611                struct GNUNET_STREAM_MessageHeader *message,
612                SendFinishCallback finish_cb,
613                void *finish_cb_cls)
614 {
615   struct MessageQueue *queue_entity;
616
617   GNUNET_assert 
618     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
619      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
620
621   LOG (GNUNET_ERROR_TYPE_DEBUG,
622        "%s: Queueing message of type %d and size %d\n",
623        GNUNET_i2s (&socket->other_peer),
624        ntohs (message->header.type),
625        ntohs (message->header.size));
626   GNUNET_assert (NULL != message);
627   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
628   queue_entity->message = message;
629   queue_entity->finish_cb = finish_cb;
630   queue_entity->finish_cb_cls = finish_cb_cls;
631   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
632                                     socket->queue_tail,
633                                     queue_entity);
634   if (NULL == socket->transmit_handle)
635   {
636     socket->retries = 0;
637     socket->transmit_handle = 
638       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
639                                          0, /* Corking */
640                                          1, /* Priority */
641                                          socket->retransmit_timeout,
642                                          &socket->other_peer,
643                                          ntohs (message->header.size),
644                                          &send_message_notify,
645                                          socket);
646   }
647 }
648
649
650 /**
651  * Copies a message and queues it for sending using the mesh connection of
652  * given socket 
653  *
654  * @param socket the socket whose mesh connection is used
655  * @param message the message to be sent
656  * @param finish_cb the callback to be called when the message is sent
657  * @param finish_cb_cls the closure for the callback
658  */
659 static void
660 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
661                         const struct GNUNET_STREAM_MessageHeader *message,
662                         SendFinishCallback finish_cb,
663                         void *finish_cb_cls)
664 {
665   struct GNUNET_STREAM_MessageHeader *msg_copy;
666   uint16_t size;
667   
668   size = ntohs (message->header.size);
669   msg_copy = GNUNET_malloc (size);
670   memcpy (msg_copy, message, size);
671   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
672 }
673
674
675 /**
676  * Callback function for sending ack message
677  *
678  * @param cls closure the ACK message created in ack_task
679  * @param size number of bytes available in buffer
680  * @param buf where the callee should write the message
681  * @return number of bytes written to buf
682  */
683 static size_t
684 send_ack_notify (void *cls, size_t size, void *buf)
685 {
686   struct GNUNET_STREAM_Socket *socket = cls;
687
688   if (0 == size)
689   {
690     LOG (GNUNET_ERROR_TYPE_DEBUG,
691          "%s called with size 0\n", __func__);
692     return 0;
693   }
694   GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
695   
696   size = ntohs (socket->ack_msg->header.header.size);
697   memcpy (buf, socket->ack_msg, size);
698   
699   GNUNET_free (socket->ack_msg);
700   socket->ack_msg = NULL;
701   socket->ack_transmit_handle = NULL;
702   return size;
703 }
704
705
706 /**
707  * Writes data using the given socket. The amount of data written is limited by
708  * the receiver_window_size
709  *
710  * @param socket the socket to use
711  */
712 static void 
713 write_data (struct GNUNET_STREAM_Socket *socket);
714
715
716 /**
717  * Task for retransmitting data messages if they aren't ACK before their ack
718  * deadline 
719  *
720  * @param cls the socket
721  * @param tc the Task context
722  */
723 static void
724 retransmission_timeout_task (void *cls,
725                              const struct GNUNET_SCHEDULER_TaskContext *tc)
726 {
727   struct GNUNET_STREAM_Socket *socket = cls;
728   
729   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
730     return;
731
732   LOG (GNUNET_ERROR_TYPE_DEBUG,
733        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
734   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
735   write_data (socket);
736 }
737
738
739 /**
740  * Task for sending ACK message
741  *
742  * @param cls the socket
743  * @param tc the Task context
744  */
745 static void
746 ack_task (void *cls,
747           const struct GNUNET_SCHEDULER_TaskContext *tc)
748 {
749   struct GNUNET_STREAM_Socket *socket = cls;
750   struct GNUNET_STREAM_AckMessage *ack_msg;
751
752   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
753   {
754     return;
755   }
756   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
757   /* Create the ACK Message */
758   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
759   ack_msg->header.header.size = htons (sizeof (struct 
760                                                GNUNET_STREAM_AckMessage));
761   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
762   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
763   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
764   ack_msg->receive_window_remaining = 
765     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
766   socket->ack_msg = ack_msg;
767   /* Request MESH for sending ACK */
768   socket->ack_transmit_handle = 
769     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
770                                        0, /* Corking */
771                                        1, /* Priority */
772                                        socket->retransmit_timeout,
773                                        &socket->other_peer,
774                                        ntohs (ack_msg->header.header.size),
775                                        &send_ack_notify,
776                                        socket);
777 }
778
779
780 /**
781  * Retransmission task for shutdown messages
782  *
783  * @param cls the shutdown handle
784  * @param tc the Task Context
785  */
786 static void
787 close_msg_retransmission_task (void *cls,
788                                const struct GNUNET_SCHEDULER_TaskContext *tc)
789 {
790   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
791   struct GNUNET_STREAM_MessageHeader *msg;
792   struct GNUNET_STREAM_Socket *socket;
793
794   GNUNET_assert (NULL != shutdown_handle);
795   socket = shutdown_handle->socket;
796
797   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
798   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
799   switch (shutdown_handle->operation)
800   {
801   case SHUT_RDWR:
802     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
803     break;
804   case SHUT_RD:
805     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
806     break;
807   case SHUT_WR:
808     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
809     break;
810   default:
811     GNUNET_free (msg);
812     shutdown_handle->close_msg_retransmission_task_id = 
813       GNUNET_SCHEDULER_NO_TASK;
814     return;
815   }
816   queue_message (socket, msg, NULL, NULL);
817   shutdown_handle->close_msg_retransmission_task_id =
818     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
819                                   &close_msg_retransmission_task,
820                                   shutdown_handle);
821 }
822
823
824 /**
825  * Function to modify a bit in GNUNET_STREAM_AckBitmap
826  *
827  * @param bitmap the bitmap to modify
828  * @param bit the bit number to modify
829  * @param value GNUNET_YES to on, GNUNET_NO to off
830  */
831 static void
832 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
833                       unsigned int bit, 
834                       int value)
835 {
836   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
837   if (GNUNET_YES == value)
838     *bitmap |= (1LL << bit);
839   else
840     *bitmap &= ~(1LL << bit);
841 }
842
843
844 /**
845  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
846  *
847  * @param bitmap address of the bitmap that has to be checked
848  * @param bit the bit number to check
849  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
850  */
851 static uint8_t
852 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
853                       unsigned int bit)
854 {
855   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
856   return 0 != (*bitmap & (1LL << bit));
857 }
858
859
860 /**
861  * Writes data using the given socket. The amount of data written is limited by
862  * the receiver_window_size
863  *
864  * @param socket the socket to use
865  */
866 static void 
867 write_data (struct GNUNET_STREAM_Socket *socket)
868 {
869   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
870   int packet;                   /* Although an int, should never be negative */
871   int ack_packet;
872
873   ack_packet = -1;
874   /* Find the last acknowledged packet */
875   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
876   {      
877     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
878                                             packet))
879       ack_packet = packet;        
880     else if (NULL == io_handle->messages[packet])
881       break;
882   }
883   /* Resend packets which weren't ack'ed */
884   for (packet=0; packet < ack_packet; packet++)
885   {
886     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
887                                            packet))
888     {
889       LOG (GNUNET_ERROR_TYPE_DEBUG,
890            "%s: Placing DATA message with sequence %u in send queue\n",
891            GNUNET_i2s (&socket->other_peer),
892            ntohl (io_handle->messages[packet]->sequence_number));
893       copy_and_queue_message (socket,
894                               &io_handle->messages[packet]->header,
895                               NULL,
896                               NULL);
897     }
898   }
899   packet = ack_packet + 1;
900   /* Now send new packets if there is enough buffer space */
901   while ( (NULL != io_handle->messages[packet]) &&
902           (socket->receiver_window_available 
903            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
904           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
905   {
906     socket->receiver_window_available -= 
907       ntohs (io_handle->messages[packet]->header.header.size);
908     LOG (GNUNET_ERROR_TYPE_DEBUG,
909          "%s: Placing DATA message with sequence %u in send queue\n",
910          GNUNET_i2s (&socket->other_peer),
911          ntohl (io_handle->messages[packet]->sequence_number));
912     copy_and_queue_message (socket,
913                             &io_handle->messages[packet]->header,
914                             NULL,
915                             NULL);
916     packet++;
917   }
918   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
919     socket->retransmission_timeout_task_id = 
920       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
921                                     (GNUNET_TIME_UNIT_SECONDS, 8),
922                                     &retransmission_timeout_task,
923                                     socket);
924 }
925
926
927 /**
928  * Task for calling the read processor
929  *
930  * @param cls the socket
931  * @param tc the task context
932  */
933 static void
934 call_read_processor (void *cls,
935                      const struct GNUNET_SCHEDULER_TaskContext *tc)
936 {
937   struct GNUNET_STREAM_Socket *socket = cls;
938   size_t read_size;
939   size_t valid_read_size;
940   unsigned int packet;
941   uint32_t sequence_increase;
942   uint32_t offset_increase;
943
944   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
945   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
946     return;
947
948   if (NULL == socket->receive_buffer) 
949     return;
950
951   GNUNET_assert (NULL != socket->read_handle);
952   GNUNET_assert (NULL != socket->read_handle->proc);
953
954   /* Check the bitmap for any holes */
955   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
956   {
957     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
958                                            packet))
959       break;
960   }
961   /* We only call read processor if we have the first packet */
962   GNUNET_assert (0 < packet);
963   valid_read_size = 
964     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
965   GNUNET_assert (0 != valid_read_size);
966   /* Cancel the read_io_timeout_task */
967   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
968   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
969   /* Call the data processor */
970   LOG (GNUNET_ERROR_TYPE_DEBUG,
971        "%s: Calling read processor\n",
972        GNUNET_i2s (&socket->other_peer));
973   read_size = 
974     socket->read_handle->proc (socket->read_handle->proc_cls,
975                                socket->status,
976                                socket->receive_buffer + socket->copy_offset,
977                                valid_read_size);
978   LOG (GNUNET_ERROR_TYPE_DEBUG,
979        "%s: Read processor read %d bytes\n",
980        GNUNET_i2s (&socket->other_peer), read_size);
981   LOG (GNUNET_ERROR_TYPE_DEBUG,
982        "%s: Read processor completed successfully\n",
983        GNUNET_i2s (&socket->other_peer));
984   /* Free the read handle */
985   GNUNET_free (socket->read_handle);
986   socket->read_handle = NULL;
987   GNUNET_assert (read_size <= valid_read_size);
988   socket->copy_offset += read_size;
989   /* Determine upto which packet we can remove from the buffer */
990   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
991   {
992     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
993     { packet++; break; }
994     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
995       break;
996   }
997
998   /* If no packets can be removed we can't move the buffer */
999   if (0 == packet) return;
1000   sequence_increase = packet;
1001   LOG (GNUNET_ERROR_TYPE_DEBUG,
1002        "%s: Sequence increase after read processor completion: %u\n",
1003        GNUNET_i2s (&socket->other_peer), sequence_increase);
1004
1005   /* Shift the data in the receive buffer */
1006   socket->receive_buffer = 
1007     memmove (socket->receive_buffer,
1008              socket->receive_buffer 
1009              + socket->receive_buffer_boundaries[sequence_increase-1],
1010              socket->receive_buffer_size
1011              - socket->receive_buffer_boundaries[sequence_increase-1]);
1012   /* Shift the bitmap */
1013   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1014   /* Set read_sequence_number */
1015   socket->read_sequence_number += sequence_increase;
1016   /* Set read_offset */
1017   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1018   socket->read_offset += offset_increase;
1019   /* Fix copy_offset */
1020   GNUNET_assert (offset_increase <= socket->copy_offset);
1021   socket->copy_offset -= offset_increase;
1022   /* Fix relative boundaries */
1023   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1024   {
1025     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1026     {
1027       uint32_t ahead_buffer_boundary;
1028
1029       ahead_buffer_boundary = 
1030         socket->receive_buffer_boundaries[packet + sequence_increase];
1031       if (0 == ahead_buffer_boundary)
1032         socket->receive_buffer_boundaries[packet] = 0;
1033       else
1034       {
1035         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1036         socket->receive_buffer_boundaries[packet] = 
1037           ahead_buffer_boundary - offset_increase;
1038       }
1039     }
1040     else
1041       socket->receive_buffer_boundaries[packet] = 0;
1042   }
1043 }
1044
1045
1046 /**
1047  * Cancels the existing read io handle
1048  *
1049  * @param cls the closure from the SCHEDULER call
1050  * @param tc the task context
1051  */
1052 static void
1053 read_io_timeout (void *cls, 
1054                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1055 {
1056   struct GNUNET_STREAM_Socket *socket = cls;
1057   GNUNET_STREAM_DataProcessor proc;
1058   void *proc_cls;
1059
1060   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1061   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1062   {
1063     LOG (GNUNET_ERROR_TYPE_DEBUG,
1064          "%s: Read task timedout - Cancelling it\n",
1065          GNUNET_i2s (&socket->other_peer));
1066     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1067     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1068   }
1069   GNUNET_assert (NULL != socket->read_handle);
1070   proc = socket->read_handle->proc;
1071   proc_cls = socket->read_handle->proc_cls;
1072
1073   GNUNET_free (socket->read_handle);
1074   socket->read_handle = NULL;
1075   /* Call the read processor to signal timeout */
1076   proc (proc_cls,
1077         GNUNET_STREAM_TIMEOUT,
1078         NULL,
1079         0);
1080 }
1081
1082
1083 /**
1084  * Handler for DATA messages; Same for both client and server
1085  *
1086  * @param socket the socket through which the ack was received
1087  * @param tunnel connection to the other end
1088  * @param sender who sent the message
1089  * @param msg the data message
1090  * @param atsi performance data for the connection
1091  * @return GNUNET_OK to keep the connection open,
1092  *         GNUNET_SYSERR to close it (signal serious error)
1093  */
1094 static int
1095 handle_data (struct GNUNET_STREAM_Socket *socket,
1096              struct GNUNET_MESH_Tunnel *tunnel,
1097              const struct GNUNET_PeerIdentity *sender,
1098              const struct GNUNET_STREAM_DataMessage *msg,
1099              const struct GNUNET_ATS_Information*atsi)
1100 {
1101   const void *payload;
1102   uint32_t bytes_needed;
1103   uint32_t relative_offset;
1104   uint32_t relative_sequence_number;
1105   uint16_t size;
1106
1107   size = htons (msg->header.header.size);
1108   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1109   {
1110     GNUNET_break_op (0);
1111     return GNUNET_SYSERR;
1112   }
1113
1114   if (0 != memcmp (sender,
1115                    &socket->other_peer,
1116                    sizeof (struct GNUNET_PeerIdentity)))
1117   {
1118     LOG (GNUNET_ERROR_TYPE_DEBUG,
1119          "%s: Received DATA from non-confirming peer\n",
1120          GNUNET_i2s (&socket->other_peer));
1121     return GNUNET_YES;
1122   }
1123
1124   switch (socket->state)
1125   {
1126   case STATE_ESTABLISHED:
1127   case STATE_TRANSMIT_CLOSED:
1128   case STATE_TRANSMIT_CLOSE_WAIT:
1129       
1130     /* check if the message's sequence number is in the range we are
1131        expecting */
1132     relative_sequence_number = 
1133       ntohl (msg->sequence_number) - socket->read_sequence_number;
1134     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1135     {
1136       LOG (GNUNET_ERROR_TYPE_DEBUG,
1137            "%s: Ignoring received message with sequence number %u\n",
1138            GNUNET_i2s (&socket->other_peer),
1139            ntohl (msg->sequence_number));
1140       /* Start ACK sending task if one is not already present */
1141       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1142       {
1143         socket->ack_task_id = 
1144           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1145                                         (msg->ack_deadline),
1146                                         &ack_task,
1147                                         socket);
1148       }
1149       return GNUNET_YES;
1150     }
1151       
1152     /* Check if we have already seen this message */
1153     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1154                                             relative_sequence_number))
1155     {
1156       LOG (GNUNET_ERROR_TYPE_DEBUG,
1157            "%s: Ignoring already received message with sequence number %u\n",
1158            GNUNET_i2s (&socket->other_peer),
1159            ntohl (msg->sequence_number));
1160       /* Start ACK sending task if one is not already present */
1161       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1162       {
1163         socket->ack_task_id = 
1164           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1165                                         (msg->ack_deadline),
1166                                         &ack_task,
1167                                         socket);
1168       }
1169       return GNUNET_YES;
1170     }
1171
1172     LOG (GNUNET_ERROR_TYPE_DEBUG,
1173          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1174          GNUNET_i2s (&socket->other_peer),
1175          ntohl (msg->sequence_number),
1176          ntohs (msg->header.header.size),
1177          GNUNET_i2s (&socket->other_peer));
1178       
1179     /* Check if we have to allocate the buffer */
1180     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1181     relative_offset = ntohl (msg->offset) - socket->read_offset;
1182     bytes_needed = relative_offset + size;
1183     if (bytes_needed > socket->receive_buffer_size)
1184     {
1185       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1186       {
1187         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1188                                                  bytes_needed);
1189         socket->receive_buffer_size = bytes_needed;
1190       }
1191       else
1192       {
1193         LOG (GNUNET_ERROR_TYPE_DEBUG,
1194              "%s: Cannot accommodate packet %d as buffer is full\n",
1195              GNUNET_i2s (&socket->other_peer),
1196              ntohl (msg->sequence_number));
1197         return GNUNET_YES;
1198       }
1199     }
1200       
1201     /* Copy Data to buffer */
1202     payload = &msg[1];
1203     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1204     memcpy (socket->receive_buffer + relative_offset,
1205             payload,
1206             size);
1207     socket->receive_buffer_boundaries[relative_sequence_number] = 
1208       relative_offset + size;
1209       
1210     /* Modify the ACK bitmap */
1211     ackbitmap_modify_bit (&socket->ack_bitmap,
1212                           relative_sequence_number,
1213                           GNUNET_YES);
1214
1215     /* Start ACK sending task if one is not already present */
1216     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1217     {
1218       socket->ack_task_id = 
1219         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1220                                       (msg->ack_deadline),
1221                                       &ack_task,
1222                                       socket);
1223     }
1224
1225     if ((NULL != socket->read_handle) /* A read handle is waiting */
1226         /* There is no current read task */
1227         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1228         /* We have the first packet */
1229         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1230                                                0)))
1231     {
1232       LOG (GNUNET_ERROR_TYPE_DEBUG,
1233            "%s: Scheduling read processor\n",
1234            GNUNET_i2s (&socket->other_peer));
1235           
1236       socket->read_task_id = 
1237         GNUNET_SCHEDULER_add_now (&call_read_processor,
1238                                   socket);
1239     }
1240       
1241     break;
1242
1243   default:
1244     LOG (GNUNET_ERROR_TYPE_DEBUG,
1245          "%s: Received data message when it cannot be handled\n",
1246          GNUNET_i2s (&socket->other_peer));
1247     break;
1248   }
1249   return GNUNET_YES;
1250 }
1251
1252
1253 /**
1254  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1255  *
1256  * @param cls the socket (set from GNUNET_MESH_connect)
1257  * @param tunnel connection to the other end
1258  * @param tunnel_ctx place to store local state associated with the tunnel
1259  * @param sender who sent the message
1260  * @param message the actual message
1261  * @param atsi performance data for the connection
1262  * @return GNUNET_OK to keep the connection open,
1263  *         GNUNET_SYSERR to close it (signal serious error)
1264  */
1265 static int
1266 client_handle_data (void *cls,
1267                     struct GNUNET_MESH_Tunnel *tunnel,
1268                     void **tunnel_ctx,
1269                     const struct GNUNET_PeerIdentity *sender,
1270                     const struct GNUNET_MessageHeader *message,
1271                     const struct GNUNET_ATS_Information*atsi)
1272 {
1273   struct GNUNET_STREAM_Socket *socket = cls;
1274
1275   return handle_data (socket, 
1276                       tunnel, 
1277                       sender, 
1278                       (const struct GNUNET_STREAM_DataMessage *) message, 
1279                       atsi);
1280 }
1281
1282
1283 /**
1284  * Callback to set state to ESTABLISHED
1285  *
1286  * @param cls the closure from queue_message FIXME: document
1287  * @param socket the socket to requiring state change
1288  */
1289 static void
1290 set_state_established (void *cls,
1291                        struct GNUNET_STREAM_Socket *socket)
1292 {
1293   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1294        "%s: Attaining ESTABLISHED state\n",
1295        GNUNET_i2s (&socket->other_peer));
1296   socket->write_offset = 0;
1297   socket->read_offset = 0;
1298   socket->state = STATE_ESTABLISHED;
1299   if (NULL != socket->lsocket)
1300   {
1301     LOG (GNUNET_ERROR_TYPE_DEBUG,
1302          "%s: Calling listen callback\n",
1303          GNUNET_i2s (&socket->other_peer));
1304     if (GNUNET_SYSERR == 
1305         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1306                                     socket,
1307                                     &socket->other_peer))
1308     {
1309       socket->state = STATE_CLOSED;
1310       /* FIXME: We should close in a decent way (send RST) */
1311       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1312       GNUNET_free (socket);
1313     }
1314   }
1315   else if (NULL != socket->open_cb)
1316     socket->open_cb (socket->open_cls, socket);
1317 }
1318
1319
1320 /**
1321  * Callback to set state to HELLO_WAIT
1322  *
1323  * @param cls the closure from queue_message
1324  * @param socket the socket to requiring state change
1325  */
1326 static void
1327 set_state_hello_wait (void *cls,
1328                       struct GNUNET_STREAM_Socket *socket)
1329 {
1330   GNUNET_assert (STATE_INIT == socket->state);
1331   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1332        "%s: Attaining HELLO_WAIT state\n",
1333        GNUNET_i2s (&socket->other_peer));
1334   socket->state = STATE_HELLO_WAIT;
1335 }
1336
1337
1338 /**
1339  * Callback to set state to CLOSE_WAIT
1340  *
1341  * @param cls the closure from queue_message
1342  * @param socket the socket requiring state change
1343  */
1344 static void
1345 set_state_close_wait (void *cls,
1346                       struct GNUNET_STREAM_Socket *socket)
1347 {
1348   LOG (GNUNET_ERROR_TYPE_DEBUG,
1349        "%s: Attaing CLOSE_WAIT state\n",
1350        GNUNET_i2s (&socket->other_peer));
1351   socket->state = STATE_CLOSE_WAIT;
1352   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1353   socket->receive_buffer = NULL;
1354   socket->receive_buffer_size = 0;
1355 }
1356
1357
1358 /**
1359  * Callback to set state to RECEIVE_CLOSE_WAIT
1360  *
1361  * @param cls the closure from queue_message
1362  * @param socket the socket requiring state change
1363  */
1364 static void
1365 set_state_receive_close_wait (void *cls,
1366                               struct GNUNET_STREAM_Socket *socket)
1367 {
1368   LOG (GNUNET_ERROR_TYPE_DEBUG,
1369        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1370        GNUNET_i2s (&socket->other_peer));
1371   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1372   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1373   socket->receive_buffer = NULL;
1374   socket->receive_buffer_size = 0;
1375 }
1376
1377
1378 /**
1379  * Callback to set state to TRANSMIT_CLOSE_WAIT
1380  *
1381  * @param cls the closure from queue_message
1382  * @param socket the socket requiring state change
1383  */
1384 static void
1385 set_state_transmit_close_wait (void *cls,
1386                                struct GNUNET_STREAM_Socket *socket)
1387 {
1388   LOG (GNUNET_ERROR_TYPE_DEBUG,
1389        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1390        GNUNET_i2s (&socket->other_peer));
1391   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1392 }
1393
1394
1395 /**
1396  * Callback to set state to CLOSED
1397  *
1398  * @param cls the closure from queue_message
1399  * @param socket the socket requiring state change
1400  */
1401 static void
1402 set_state_closed (void *cls,
1403                   struct GNUNET_STREAM_Socket *socket)
1404 {
1405   socket->state = STATE_CLOSED;
1406 }
1407
1408
1409 /**
1410  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1411  * socket
1412  *
1413  * @param socket the socket for which this HelloAckMessage has to be generated
1414  * @return the HelloAckMessage
1415  */
1416 static struct GNUNET_STREAM_HelloAckMessage *
1417 generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1418 {
1419   struct GNUNET_STREAM_HelloAckMessage *msg;
1420
1421   /* Get the random sequence number */
1422   if (GNUNET_YES == socket->testing_active)
1423     socket->write_sequence_number =
1424       socket->testing_set_write_sequence_number_value;
1425   else
1426     socket->write_sequence_number = 
1427       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1428   LOG (GNUNET_ERROR_TYPE_DEBUG,
1429        "%s: write sequence number %u\n",
1430        GNUNET_i2s (&socket->other_peer),
1431        (unsigned int) socket->write_sequence_number);
1432   
1433   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1434   msg->header.header.size = 
1435     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1436   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1437   msg->sequence_number = htonl (socket->write_sequence_number);
1438   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1439
1440   return msg;
1441 }
1442
1443
1444 /**
1445  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1446  *
1447  * @param cls the socket (set from GNUNET_MESH_connect)
1448  * @param tunnel connection to the other end
1449  * @param tunnel_ctx this is NULL
1450  * @param sender who sent the message
1451  * @param message the actual message
1452  * @param atsi performance data for the connection
1453  * @return GNUNET_OK to keep the connection open,
1454  *         GNUNET_SYSERR to close it (signal serious error)
1455  */
1456 static int
1457 client_handle_hello_ack (void *cls,
1458                          struct GNUNET_MESH_Tunnel *tunnel,
1459                          void **tunnel_ctx,
1460                          const struct GNUNET_PeerIdentity *sender,
1461                          const struct GNUNET_MessageHeader *message,
1462                          const struct GNUNET_ATS_Information*atsi)
1463 {
1464   struct GNUNET_STREAM_Socket *socket = cls;
1465   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1466   struct GNUNET_STREAM_HelloAckMessage *reply;
1467
1468   if (0 != memcmp (sender,
1469                    &socket->other_peer,
1470                    sizeof (struct GNUNET_PeerIdentity)))
1471   {
1472     LOG (GNUNET_ERROR_TYPE_DEBUG,
1473          "%s: Received HELLO_ACK from non-confirming peer\n",
1474          GNUNET_i2s (&socket->other_peer));
1475     return GNUNET_YES;
1476   }
1477   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1478   LOG (GNUNET_ERROR_TYPE_DEBUG,
1479        "%s: Received HELLO_ACK from %s\n",
1480        GNUNET_i2s (&socket->other_peer),
1481        GNUNET_i2s (&socket->other_peer));
1482
1483   GNUNET_assert (socket->tunnel == tunnel);
1484   switch (socket->state)
1485   {
1486   case STATE_HELLO_WAIT:
1487     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1488     LOG (GNUNET_ERROR_TYPE_DEBUG,
1489          "%s: Read sequence number %u\n",
1490          GNUNET_i2s (&socket->other_peer),
1491          (unsigned int) socket->read_sequence_number);
1492     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1493     reply = generate_hello_ack_msg (socket);
1494     queue_message (socket,
1495                    &reply->header, 
1496                    &set_state_established, 
1497                    NULL);      
1498     return GNUNET_OK;
1499   case STATE_ESTABLISHED:
1500   case STATE_RECEIVE_CLOSE_WAIT:
1501     // call statistics (# ACKs ignored++)
1502     return GNUNET_OK;
1503   case STATE_INIT:
1504   default:
1505     LOG (GNUNET_ERROR_TYPE_DEBUG,
1506          "%s: Server %s sent HELLO_ACK when in state %d\n", 
1507          GNUNET_i2s (&socket->other_peer),
1508          GNUNET_i2s (&socket->other_peer),
1509          socket->state);
1510     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1511     return GNUNET_SYSERR;
1512   }
1513
1514 }
1515
1516
1517 /**
1518  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1519  *
1520  * @param cls the socket (set from GNUNET_MESH_connect)
1521  * @param tunnel connection to the other end
1522  * @param tunnel_ctx this is NULL
1523  * @param sender who sent the message
1524  * @param message the actual message
1525  * @param atsi performance data for the connection
1526  * @return GNUNET_OK to keep the connection open,
1527  *         GNUNET_SYSERR to close it (signal serious error)
1528  */
1529 static int
1530 client_handle_reset (void *cls,
1531                      struct GNUNET_MESH_Tunnel *tunnel,
1532                      void **tunnel_ctx,
1533                      const struct GNUNET_PeerIdentity *sender,
1534                      const struct GNUNET_MessageHeader *message,
1535                      const struct GNUNET_ATS_Information*atsi)
1536 {
1537   // struct GNUNET_STREAM_Socket *socket = cls;
1538
1539   return GNUNET_OK;
1540 }
1541
1542
1543 /**
1544  * Common message handler for handling TRANSMIT_CLOSE messages
1545  *
1546  * @param socket the socket through which the ack was received
1547  * @param tunnel connection to the other end
1548  * @param sender who sent the message
1549  * @param msg the transmit close message
1550  * @param atsi performance data for the connection
1551  * @return GNUNET_OK to keep the connection open,
1552  *         GNUNET_SYSERR to close it (signal serious error)
1553  */
1554 static int
1555 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1556                        struct GNUNET_MESH_Tunnel *tunnel,
1557                        const struct GNUNET_PeerIdentity *sender,
1558                        const struct GNUNET_STREAM_MessageHeader *msg,
1559                        const struct GNUNET_ATS_Information*atsi)
1560 {
1561   struct GNUNET_STREAM_MessageHeader *reply;
1562
1563   switch (socket->state)
1564   {
1565   case STATE_ESTABLISHED:
1566     socket->state = STATE_RECEIVE_CLOSED;
1567
1568     /* Send TRANSMIT_CLOSE_ACK */
1569     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1570     reply->header.type = 
1571       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1572     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1573     queue_message (socket, reply, NULL, NULL);
1574     break;
1575
1576   default:
1577     /* FIXME: Call statistics? */
1578     break;
1579   }
1580   return GNUNET_YES;
1581 }
1582
1583
1584 /**
1585  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1586  *
1587  * @param cls the socket (set from GNUNET_MESH_connect)
1588  * @param tunnel connection to the other end
1589  * @param tunnel_ctx this is NULL
1590  * @param sender who sent the message
1591  * @param message the actual message
1592  * @param atsi performance data for the connection
1593  * @return GNUNET_OK to keep the connection open,
1594  *         GNUNET_SYSERR to close it (signal serious error)
1595  */
1596 static int
1597 client_handle_transmit_close (void *cls,
1598                               struct GNUNET_MESH_Tunnel *tunnel,
1599                               void **tunnel_ctx,
1600                               const struct GNUNET_PeerIdentity *sender,
1601                               const struct GNUNET_MessageHeader *message,
1602                               const struct GNUNET_ATS_Information*atsi)
1603 {
1604   struct GNUNET_STREAM_Socket *socket = cls;
1605   
1606   return handle_transmit_close (socket,
1607                                 tunnel,
1608                                 sender,
1609                                 (struct GNUNET_STREAM_MessageHeader *)message,
1610                                 atsi);
1611 }
1612
1613
1614 /**
1615  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1616  *
1617  * @param socket the socket
1618  * @param tunnel connection to the other end
1619  * @param sender who sent the message
1620  * @param message the actual message
1621  * @param atsi performance data for the connection
1622  * @param operation the close operation which is being ACK'ed
1623  * @return GNUNET_OK to keep the connection open,
1624  *         GNUNET_SYSERR to close it (signal serious error)
1625  */
1626 static int
1627 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1628                           struct GNUNET_MESH_Tunnel *tunnel,
1629                           const struct GNUNET_PeerIdentity *sender,
1630                           const struct GNUNET_STREAM_MessageHeader *message,
1631                           const struct GNUNET_ATS_Information *atsi,
1632                           int operation)
1633 {
1634   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1635
1636   shutdown_handle = socket->shutdown_handle;
1637   if (NULL == shutdown_handle)
1638   {
1639     LOG (GNUNET_ERROR_TYPE_DEBUG,
1640          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1641          GNUNET_i2s (&socket->other_peer));
1642     return GNUNET_OK;
1643   }
1644
1645   switch (operation)
1646   {
1647   case SHUT_RDWR:
1648     switch (socket->state)
1649     {
1650     case STATE_CLOSE_WAIT:
1651       if (SHUT_RDWR != shutdown_handle->operation)
1652       {
1653         LOG (GNUNET_ERROR_TYPE_DEBUG,
1654              "%s: Received CLOSE_ACK when shutdown handle is not for "
1655              "SHUT_RDWR\n",
1656              GNUNET_i2s (&socket->other_peer));
1657         return GNUNET_OK;
1658       }
1659
1660       LOG (GNUNET_ERROR_TYPE_DEBUG,
1661            "%s: Received CLOSE_ACK from %s\n",
1662            GNUNET_i2s (&socket->other_peer),
1663            GNUNET_i2s (&socket->other_peer));
1664       socket->state = STATE_CLOSED;
1665       break;
1666     default:
1667       LOG (GNUNET_ERROR_TYPE_DEBUG,
1668            "%s: Received CLOSE_ACK when in it not expected\n",
1669            GNUNET_i2s (&socket->other_peer));
1670       return GNUNET_OK;
1671     }
1672     break;
1673
1674   case SHUT_RD:
1675     switch (socket->state)
1676     {
1677     case STATE_RECEIVE_CLOSE_WAIT:
1678       if (SHUT_RD != shutdown_handle->operation)
1679       {
1680         LOG (GNUNET_ERROR_TYPE_DEBUG,
1681              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1682              "is not for SHUT_RD\n",
1683              GNUNET_i2s (&socket->other_peer));
1684         return GNUNET_OK;
1685       }
1686
1687       LOG (GNUNET_ERROR_TYPE_DEBUG,
1688            "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1689            GNUNET_i2s (&socket->other_peer),
1690            GNUNET_i2s (&socket->other_peer));
1691       socket->state = STATE_RECEIVE_CLOSED;
1692       break;
1693     default:
1694       LOG (GNUNET_ERROR_TYPE_DEBUG,
1695            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1696            GNUNET_i2s (&socket->other_peer));
1697       return GNUNET_OK;
1698     }
1699
1700     break;
1701   case SHUT_WR:
1702     switch (socket->state)
1703     {
1704     case STATE_TRANSMIT_CLOSE_WAIT:
1705       if (SHUT_WR != shutdown_handle->operation)
1706       {
1707         LOG (GNUNET_ERROR_TYPE_DEBUG,
1708              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1709              "is not for SHUT_WR\n",
1710              GNUNET_i2s (&socket->other_peer));
1711         return GNUNET_OK;
1712       }
1713
1714       LOG (GNUNET_ERROR_TYPE_DEBUG,
1715            "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1716            GNUNET_i2s (&socket->other_peer),
1717            GNUNET_i2s (&socket->other_peer));
1718       socket->state = STATE_TRANSMIT_CLOSED;
1719       break;
1720     default:
1721       LOG (GNUNET_ERROR_TYPE_DEBUG,
1722            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1723            GNUNET_i2s (&socket->other_peer));
1724           
1725       return GNUNET_OK;
1726     }
1727     break;
1728   default:
1729     GNUNET_assert (0);
1730   }
1731
1732   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1733     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1734                                    operation);
1735   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1736   socket->shutdown_handle = NULL;
1737   if (GNUNET_SCHEDULER_NO_TASK
1738       != shutdown_handle->close_msg_retransmission_task_id)
1739   {
1740     GNUNET_SCHEDULER_cancel
1741       (shutdown_handle->close_msg_retransmission_task_id);
1742     shutdown_handle->close_msg_retransmission_task_id =
1743       GNUNET_SCHEDULER_NO_TASK;
1744   }
1745   return GNUNET_OK;
1746 }
1747
1748
1749 /**
1750  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1751  *
1752  * @param cls the socket (set from GNUNET_MESH_connect)
1753  * @param tunnel connection to the other end
1754  * @param tunnel_ctx this is NULL
1755  * @param sender who sent the message
1756  * @param message the actual message
1757  * @param atsi performance data for the connection
1758  * @return GNUNET_OK to keep the connection open,
1759  *         GNUNET_SYSERR to close it (signal serious error)
1760  */
1761 static int
1762 client_handle_transmit_close_ack (void *cls,
1763                                   struct GNUNET_MESH_Tunnel *tunnel,
1764                                   void **tunnel_ctx,
1765                                   const struct GNUNET_PeerIdentity *sender,
1766                                   const struct GNUNET_MessageHeader *message,
1767                                   const struct GNUNET_ATS_Information*atsi)
1768 {
1769   struct GNUNET_STREAM_Socket *socket = cls;
1770
1771   return handle_generic_close_ack (socket,
1772                                    tunnel,
1773                                    sender,
1774                                    (const struct GNUNET_STREAM_MessageHeader *)
1775                                    message,
1776                                    atsi,
1777                                    SHUT_WR);
1778 }
1779
1780
1781 /**
1782  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1783  *
1784  * @param socket the socket
1785  * @param tunnel connection to the other end
1786  * @param sender who sent the message
1787  * @param message the actual message
1788  * @param atsi performance data for the connection
1789  * @return GNUNET_OK to keep the connection open,
1790  *         GNUNET_SYSERR to close it (signal serious error)
1791  */
1792 static int
1793 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1794                       struct GNUNET_MESH_Tunnel *tunnel,
1795                       const struct GNUNET_PeerIdentity *sender,
1796                       const struct GNUNET_STREAM_MessageHeader *message,
1797                       const struct GNUNET_ATS_Information *atsi)
1798 {
1799   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1800
1801   switch (socket->state)
1802   {
1803   case STATE_INIT:
1804   case STATE_LISTEN:
1805   case STATE_HELLO_WAIT:
1806     LOG (GNUNET_ERROR_TYPE_DEBUG,
1807          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1808          GNUNET_i2s (&socket->other_peer));
1809     return GNUNET_OK;
1810   default:
1811     break;
1812   }
1813   
1814   LOG (GNUNET_ERROR_TYPE_DEBUG,
1815        "%s: Received RECEIVE_CLOSE from %s\n",
1816        GNUNET_i2s (&socket->other_peer),
1817        GNUNET_i2s (&socket->other_peer));
1818   receive_close_ack =
1819     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1820   receive_close_ack->header.size =
1821     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1822   receive_close_ack->header.type =
1823     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1824   queue_message (socket,
1825                  receive_close_ack,
1826                  &set_state_closed,
1827                  NULL);
1828   
1829   /* FIXME: Handle the case where write handle is present; the write operation
1830      should be deemed as finised and the write continuation callback
1831      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1832   return GNUNET_OK;
1833 }
1834
1835
1836 /**
1837  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1838  *
1839  * @param cls the socket (set from GNUNET_MESH_connect)
1840  * @param tunnel connection to the other end
1841  * @param tunnel_ctx this is NULL
1842  * @param sender who sent the message
1843  * @param message the actual message
1844  * @param atsi performance data for the connection
1845  * @return GNUNET_OK to keep the connection open,
1846  *         GNUNET_SYSERR to close it (signal serious error)
1847  */
1848 static int
1849 client_handle_receive_close (void *cls,
1850                              struct GNUNET_MESH_Tunnel *tunnel,
1851                              void **tunnel_ctx,
1852                              const struct GNUNET_PeerIdentity *sender,
1853                              const struct GNUNET_MessageHeader *message,
1854                              const struct GNUNET_ATS_Information*atsi)
1855 {
1856   struct GNUNET_STREAM_Socket *socket = cls;
1857
1858   return
1859     handle_receive_close (socket,
1860                           tunnel,
1861                           sender,
1862                           (const struct GNUNET_STREAM_MessageHeader *) message,
1863                           atsi);
1864 }
1865
1866
1867 /**
1868  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1869  *
1870  * @param cls the socket (set from GNUNET_MESH_connect)
1871  * @param tunnel connection to the other end
1872  * @param tunnel_ctx this is NULL
1873  * @param sender who sent the message
1874  * @param message the actual message
1875  * @param atsi performance data for the connection
1876  * @return GNUNET_OK to keep the connection open,
1877  *         GNUNET_SYSERR to close it (signal serious error)
1878  */
1879 static int
1880 client_handle_receive_close_ack (void *cls,
1881                                  struct GNUNET_MESH_Tunnel *tunnel,
1882                                  void **tunnel_ctx,
1883                                  const struct GNUNET_PeerIdentity *sender,
1884                                  const struct GNUNET_MessageHeader *message,
1885                                  const struct GNUNET_ATS_Information*atsi)
1886 {
1887   struct GNUNET_STREAM_Socket *socket = cls;
1888
1889   return handle_generic_close_ack (socket,
1890                                    tunnel,
1891                                    sender,
1892                                    (const struct GNUNET_STREAM_MessageHeader *)
1893                                    message,
1894                                    atsi,
1895                                    SHUT_RD);
1896 }
1897
1898
1899 /**
1900  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1901  *
1902  * @param socket the socket
1903  * @param tunnel connection to the other end
1904  * @param sender who sent the message
1905  * @param message the actual message
1906  * @param atsi performance data for the connection
1907  * @return GNUNET_OK to keep the connection open,
1908  *         GNUNET_SYSERR to close it (signal serious error)
1909  */
1910 static int
1911 handle_close (struct GNUNET_STREAM_Socket *socket,
1912               struct GNUNET_MESH_Tunnel *tunnel,
1913               const struct GNUNET_PeerIdentity *sender,
1914               const struct GNUNET_STREAM_MessageHeader *message,
1915               const struct GNUNET_ATS_Information*atsi)
1916 {
1917   struct GNUNET_STREAM_MessageHeader *close_ack;
1918
1919   switch (socket->state)
1920   {
1921   case STATE_INIT:
1922   case STATE_LISTEN:
1923   case STATE_HELLO_WAIT:
1924     LOG (GNUNET_ERROR_TYPE_DEBUG,
1925          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1926          GNUNET_i2s (&socket->other_peer));
1927     return GNUNET_OK;
1928   default:
1929     break;
1930   }
1931
1932   LOG (GNUNET_ERROR_TYPE_DEBUG,
1933        "%s: Received CLOSE from %s\n",
1934        GNUNET_i2s (&socket->other_peer),
1935        GNUNET_i2s (&socket->other_peer));
1936   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1937   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1938   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1939   queue_message (socket,
1940                  close_ack,
1941                  &set_state_closed,
1942                  NULL);
1943   if (socket->state == STATE_CLOSED)
1944     return GNUNET_OK;
1945
1946   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1947   socket->receive_buffer = NULL;
1948   socket->receive_buffer_size = 0;
1949   return GNUNET_OK;
1950 }
1951
1952
1953 /**
1954  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1955  *
1956  * @param cls the socket (set from GNUNET_MESH_connect)
1957  * @param tunnel connection to the other end
1958  * @param tunnel_ctx this is NULL
1959  * @param sender who sent the message
1960  * @param message the actual message
1961  * @param atsi performance data for the connection
1962  * @return GNUNET_OK to keep the connection open,
1963  *         GNUNET_SYSERR to close it (signal serious error)
1964  */
1965 static int
1966 client_handle_close (void *cls,
1967                      struct GNUNET_MESH_Tunnel *tunnel,
1968                      void **tunnel_ctx,
1969                      const struct GNUNET_PeerIdentity *sender,
1970                      const struct GNUNET_MessageHeader *message,
1971                      const struct GNUNET_ATS_Information*atsi)
1972 {
1973   struct GNUNET_STREAM_Socket *socket = cls;
1974
1975   return handle_close (socket,
1976                        tunnel,
1977                        sender,
1978                        (const struct GNUNET_STREAM_MessageHeader *) message,
1979                        atsi);
1980 }
1981
1982
1983 /**
1984  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1985  *
1986  * @param cls the socket (set from GNUNET_MESH_connect)
1987  * @param tunnel connection to the other end
1988  * @param tunnel_ctx this is NULL
1989  * @param sender who sent the message
1990  * @param message the actual message
1991  * @param atsi performance data for the connection
1992  * @return GNUNET_OK to keep the connection open,
1993  *         GNUNET_SYSERR to close it (signal serious error)
1994  */
1995 static int
1996 client_handle_close_ack (void *cls,
1997                          struct GNUNET_MESH_Tunnel *tunnel,
1998                          void **tunnel_ctx,
1999                          const struct GNUNET_PeerIdentity *sender,
2000                          const struct GNUNET_MessageHeader *message,
2001                          const struct GNUNET_ATS_Information *atsi)
2002 {
2003   struct GNUNET_STREAM_Socket *socket = cls;
2004
2005   return handle_generic_close_ack (socket,
2006                                    tunnel,
2007                                    sender,
2008                                    (const struct GNUNET_STREAM_MessageHeader *) 
2009                                    message,
2010                                    atsi,
2011                                    SHUT_RDWR);
2012 }
2013
2014 /*****************************/
2015 /* Server's Message Handlers */
2016 /*****************************/
2017
2018 /**
2019  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2020  *
2021  * @param cls the closure
2022  * @param tunnel connection to the other end
2023  * @param tunnel_ctx the socket
2024  * @param sender who sent the message
2025  * @param message the actual message
2026  * @param atsi performance data for the connection
2027  * @return GNUNET_OK to keep the connection open,
2028  *         GNUNET_SYSERR to close it (signal serious error)
2029  */
2030 static int
2031 server_handle_data (void *cls,
2032                     struct GNUNET_MESH_Tunnel *tunnel,
2033                     void **tunnel_ctx,
2034                     const struct GNUNET_PeerIdentity *sender,
2035                     const struct GNUNET_MessageHeader *message,
2036                     const struct GNUNET_ATS_Information*atsi)
2037 {
2038   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2039
2040   return handle_data (socket,
2041                       tunnel,
2042                       sender,
2043                       (const struct GNUNET_STREAM_DataMessage *)message,
2044                       atsi);
2045 }
2046
2047
2048 /**
2049  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2050  *
2051  * @param cls the closure
2052  * @param tunnel connection to the other end
2053  * @param tunnel_ctx the socket
2054  * @param sender who sent the message
2055  * @param message the actual message
2056  * @param atsi performance data for the connection
2057  * @return GNUNET_OK to keep the connection open,
2058  *         GNUNET_SYSERR to close it (signal serious error)
2059  */
2060 static int
2061 server_handle_hello (void *cls,
2062                      struct GNUNET_MESH_Tunnel *tunnel,
2063                      void **tunnel_ctx,
2064                      const struct GNUNET_PeerIdentity *sender,
2065                      const struct GNUNET_MessageHeader *message,
2066                      const struct GNUNET_ATS_Information*atsi)
2067 {
2068   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2069   struct GNUNET_STREAM_HelloAckMessage *reply;
2070
2071   if (0 != memcmp (sender,
2072                    &socket->other_peer,
2073                    sizeof (struct GNUNET_PeerIdentity)))
2074   {
2075     LOG (GNUNET_ERROR_TYPE_DEBUG,
2076          "%s: Received HELLO from non-confirming peer\n",
2077          GNUNET_i2s (&socket->other_peer));
2078     return GNUNET_YES;
2079   }
2080
2081   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
2082                  ntohs (message->type));
2083   GNUNET_assert (socket->tunnel == tunnel);
2084   LOG (GNUNET_ERROR_TYPE_DEBUG,
2085        "%s: Received HELLO from %s\n", 
2086        GNUNET_i2s (&socket->other_peer),
2087        GNUNET_i2s (&socket->other_peer));
2088
2089   if (STATE_INIT == socket->state)
2090   {
2091     reply = generate_hello_ack_msg (socket);
2092     queue_message (socket, 
2093                    &reply->header,
2094                    &set_state_hello_wait, 
2095                    NULL);
2096   }
2097   else
2098   {
2099     LOG (GNUNET_ERROR_TYPE_DEBUG,
2100          "%s: Client sent HELLO when in state %d\n", 
2101          GNUNET_i2s (&socket->other_peer),
2102          socket->state);
2103     /* FIXME: Send RESET? */
2104       
2105   }
2106   return GNUNET_OK;
2107 }
2108
2109
2110 /**
2111  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2112  *
2113  * @param cls the closure
2114  * @param tunnel connection to the other end
2115  * @param tunnel_ctx the socket
2116  * @param sender who sent the message
2117  * @param message the actual message
2118  * @param atsi performance data for the connection
2119  * @return GNUNET_OK to keep the connection open,
2120  *         GNUNET_SYSERR to close it (signal serious error)
2121  */
2122 static int
2123 server_handle_hello_ack (void *cls,
2124                          struct GNUNET_MESH_Tunnel *tunnel,
2125                          void **tunnel_ctx,
2126                          const struct GNUNET_PeerIdentity *sender,
2127                          const struct GNUNET_MessageHeader *message,
2128                          const struct GNUNET_ATS_Information*atsi)
2129 {
2130   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2131   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2132
2133   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2134                  ntohs (message->type));
2135   GNUNET_assert (socket->tunnel == tunnel);
2136   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2137   if (STATE_HELLO_WAIT == socket->state)
2138   {
2139     LOG (GNUNET_ERROR_TYPE_DEBUG,
2140          "%s: Received HELLO_ACK from %s\n",
2141          GNUNET_i2s (&socket->other_peer),
2142          GNUNET_i2s (&socket->other_peer));
2143     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2144     LOG (GNUNET_ERROR_TYPE_DEBUG,
2145          "%s: Read sequence number %u\n",
2146          GNUNET_i2s (&socket->other_peer),
2147          (unsigned int) socket->read_sequence_number);
2148     socket->receiver_window_available = 
2149       ntohl (ack_message->receiver_window_size);
2150     /* Attain ESTABLISHED state */
2151     set_state_established (NULL, socket);
2152   }
2153   else
2154   {
2155     LOG (GNUNET_ERROR_TYPE_DEBUG,
2156          "Client sent HELLO_ACK when in state %d\n", socket->state);
2157     /* FIXME: Send RESET? */
2158       
2159   }
2160   return GNUNET_OK;
2161 }
2162
2163
2164 /**
2165  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2166  *
2167  * @param cls the closure
2168  * @param tunnel connection to the other end
2169  * @param tunnel_ctx the socket
2170  * @param sender who sent the message
2171  * @param message the actual message
2172  * @param atsi performance data for the connection
2173  * @return GNUNET_OK to keep the connection open,
2174  *         GNUNET_SYSERR to close it (signal serious error)
2175  */
2176 static int
2177 server_handle_reset (void *cls,
2178                      struct GNUNET_MESH_Tunnel *tunnel,
2179                      void **tunnel_ctx,
2180                      const struct GNUNET_PeerIdentity *sender,
2181                      const struct GNUNET_MessageHeader *message,
2182                      const struct GNUNET_ATS_Information*atsi)
2183 {
2184   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2185
2186   return GNUNET_OK;
2187 }
2188
2189
2190 /**
2191  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2192  *
2193  * @param cls the closure
2194  * @param tunnel connection to the other end
2195  * @param tunnel_ctx the socket
2196  * @param sender who sent the message
2197  * @param message the actual message
2198  * @param atsi performance data for the connection
2199  * @return GNUNET_OK to keep the connection open,
2200  *         GNUNET_SYSERR to close it (signal serious error)
2201  */
2202 static int
2203 server_handle_transmit_close (void *cls,
2204                               struct GNUNET_MESH_Tunnel *tunnel,
2205                               void **tunnel_ctx,
2206                               const struct GNUNET_PeerIdentity *sender,
2207                               const struct GNUNET_MessageHeader *message,
2208                               const struct GNUNET_ATS_Information*atsi)
2209 {
2210   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2211
2212   return handle_transmit_close (socket,
2213                                 tunnel,
2214                                 sender,
2215                                 (struct GNUNET_STREAM_MessageHeader *)message,
2216                                 atsi);
2217 }
2218
2219
2220 /**
2221  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2222  *
2223  * @param cls the closure
2224  * @param tunnel connection to the other end
2225  * @param tunnel_ctx the socket
2226  * @param sender who sent the message
2227  * @param message the actual message
2228  * @param atsi performance data for the connection
2229  * @return GNUNET_OK to keep the connection open,
2230  *         GNUNET_SYSERR to close it (signal serious error)
2231  */
2232 static int
2233 server_handle_transmit_close_ack (void *cls,
2234                                   struct GNUNET_MESH_Tunnel *tunnel,
2235                                   void **tunnel_ctx,
2236                                   const struct GNUNET_PeerIdentity *sender,
2237                                   const struct GNUNET_MessageHeader *message,
2238                                   const struct GNUNET_ATS_Information*atsi)
2239 {
2240   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2241
2242   return handle_generic_close_ack (socket,
2243                                    tunnel,
2244                                    sender,
2245                                    (const struct GNUNET_STREAM_MessageHeader *)
2246                                    message,
2247                                    atsi,
2248                                    SHUT_WR);
2249 }
2250
2251
2252 /**
2253  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2254  *
2255  * @param cls the closure
2256  * @param tunnel connection to the other end
2257  * @param tunnel_ctx the socket
2258  * @param sender who sent the message
2259  * @param message the actual message
2260  * @param atsi performance data for the connection
2261  * @return GNUNET_OK to keep the connection open,
2262  *         GNUNET_SYSERR to close it (signal serious error)
2263  */
2264 static int
2265 server_handle_receive_close (void *cls,
2266                              struct GNUNET_MESH_Tunnel *tunnel,
2267                              void **tunnel_ctx,
2268                              const struct GNUNET_PeerIdentity *sender,
2269                              const struct GNUNET_MessageHeader *message,
2270                              const struct GNUNET_ATS_Information*atsi)
2271 {
2272   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2273
2274   return
2275     handle_receive_close (socket,
2276                           tunnel,
2277                           sender,
2278                           (const struct GNUNET_STREAM_MessageHeader *) message,
2279                           atsi);
2280 }
2281
2282
2283 /**
2284  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2285  *
2286  * @param cls the closure
2287  * @param tunnel connection to the other end
2288  * @param tunnel_ctx the socket
2289  * @param sender who sent the message
2290  * @param message the actual message
2291  * @param atsi performance data for the connection
2292  * @return GNUNET_OK to keep the connection open,
2293  *         GNUNET_SYSERR to close it (signal serious error)
2294  */
2295 static int
2296 server_handle_receive_close_ack (void *cls,
2297                                  struct GNUNET_MESH_Tunnel *tunnel,
2298                                  void **tunnel_ctx,
2299                                  const struct GNUNET_PeerIdentity *sender,
2300                                  const struct GNUNET_MessageHeader *message,
2301                                  const struct GNUNET_ATS_Information*atsi)
2302 {
2303   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2304
2305   return handle_generic_close_ack (socket,
2306                                    tunnel,
2307                                    sender,
2308                                    (const struct GNUNET_STREAM_MessageHeader *)
2309                                    message,
2310                                    atsi,
2311                                    SHUT_RD);
2312 }
2313
2314
2315 /**
2316  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2317  *
2318  * @param cls the listen socket (from GNUNET_MESH_connect in
2319  *          GNUNET_STREAM_listen) 
2320  * @param tunnel connection to the other end
2321  * @param tunnel_ctx the socket
2322  * @param sender who sent the message
2323  * @param message the actual message
2324  * @param atsi performance data for the connection
2325  * @return GNUNET_OK to keep the connection open,
2326  *         GNUNET_SYSERR to close it (signal serious error)
2327  */
2328 static int
2329 server_handle_close (void *cls,
2330                      struct GNUNET_MESH_Tunnel *tunnel,
2331                      void **tunnel_ctx,
2332                      const struct GNUNET_PeerIdentity *sender,
2333                      const struct GNUNET_MessageHeader *message,
2334                      const struct GNUNET_ATS_Information*atsi)
2335 {
2336   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2337   
2338   return handle_close (socket,
2339                        tunnel,
2340                        sender,
2341                        (const struct GNUNET_STREAM_MessageHeader *) message,
2342                        atsi);
2343 }
2344
2345
2346 /**
2347  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2348  *
2349  * @param cls the closure
2350  * @param tunnel connection to the other end
2351  * @param tunnel_ctx the socket
2352  * @param sender who sent the message
2353  * @param message the actual message
2354  * @param atsi performance data for the connection
2355  * @return GNUNET_OK to keep the connection open,
2356  *         GNUNET_SYSERR to close it (signal serious error)
2357  */
2358 static int
2359 server_handle_close_ack (void *cls,
2360                          struct GNUNET_MESH_Tunnel *tunnel,
2361                          void **tunnel_ctx,
2362                          const struct GNUNET_PeerIdentity *sender,
2363                          const struct GNUNET_MessageHeader *message,
2364                          const struct GNUNET_ATS_Information*atsi)
2365 {
2366   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2367
2368   return handle_generic_close_ack (socket,
2369                                    tunnel,
2370                                    sender,
2371                                    (const struct GNUNET_STREAM_MessageHeader *) 
2372                                    message,
2373                                    atsi,
2374                                    SHUT_RDWR);
2375 }
2376
2377
2378 /**
2379  * Handler for DATA_ACK messages
2380  *
2381  * @param socket the socket through which the ack was received
2382  * @param tunnel connection to the other end
2383  * @param sender who sent the message
2384  * @param ack the acknowledgment message
2385  * @param atsi performance data for the connection
2386  * @return GNUNET_OK to keep the connection open,
2387  *         GNUNET_SYSERR to close it (signal serious error)
2388  */
2389 static int
2390 handle_ack (struct GNUNET_STREAM_Socket *socket,
2391             struct GNUNET_MESH_Tunnel *tunnel,
2392             const struct GNUNET_PeerIdentity *sender,
2393             const struct GNUNET_STREAM_AckMessage *ack,
2394             const struct GNUNET_ATS_Information*atsi)
2395 {
2396   unsigned int packet;
2397   int need_retransmission;
2398   uint32_t sequence_difference;
2399   
2400   if (0 != memcmp (sender,
2401                    &socket->other_peer,
2402                    sizeof (struct GNUNET_PeerIdentity)))
2403   {
2404     LOG (GNUNET_ERROR_TYPE_DEBUG,
2405          "%s: Received ACK from non-confirming peer\n",
2406          GNUNET_i2s (&socket->other_peer));
2407     return GNUNET_YES;
2408   }
2409   switch (socket->state)
2410   {
2411   case (STATE_ESTABLISHED):
2412   case (STATE_RECEIVE_CLOSED):
2413   case (STATE_RECEIVE_CLOSE_WAIT):
2414     if (NULL == socket->write_handle)
2415     {
2416       LOG (GNUNET_ERROR_TYPE_DEBUG,
2417            "%s: Received DATA_ACK when write_handle is NULL\n",
2418            GNUNET_i2s (&socket->other_peer));
2419       return GNUNET_OK;
2420     }
2421     /* FIXME: increment in the base sequence number is breaking current flow
2422      */
2423     if (!((socket->write_sequence_number 
2424            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2425     {
2426       LOG (GNUNET_ERROR_TYPE_DEBUG,
2427            "%s: Received DATA_ACK with unexpected base sequence number\n",
2428            GNUNET_i2s (&socket->other_peer));
2429       LOG (GNUNET_ERROR_TYPE_DEBUG,
2430            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2431            GNUNET_i2s (&socket->other_peer),
2432            socket->write_sequence_number,
2433            ntohl (ack->base_sequence_number));
2434       return GNUNET_OK;
2435     }
2436     /* FIXME: include the case when write_handle is cancelled - ignore the 
2437        acks */
2438     LOG (GNUNET_ERROR_TYPE_DEBUG,
2439          "%s: Received DATA_ACK from %s\n",
2440          GNUNET_i2s (&socket->other_peer),
2441          GNUNET_i2s (&socket->other_peer));
2442       
2443     /* Cancel the retransmission task */
2444     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2445     {
2446       GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2447       socket->retransmission_timeout_task_id = 
2448         GNUNET_SCHEDULER_NO_TASK;
2449     }
2450     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2451     {
2452       if (NULL == socket->write_handle->messages[packet]) break;
2453       /* BS: Base sequence from ack; PS: sequence num of current packet */
2454       sequence_difference = ntohl (ack->base_sequence_number)
2455         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2456       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2457       if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2458           || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2459               && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
2460       {
2461         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2462                               GNUNET_YES);
2463         continue;
2464       }
2465       if (GNUNET_YES == 
2466           ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2467                                 -sequence_difference))/*inversion as PS >= BS */
2468       {
2469         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
2470                               GNUNET_YES);
2471       }
2472     }
2473     /* Update the receive window remaining
2474        FIXME : Should update with the value from a data ack with greater
2475        sequence number */
2476     socket->receiver_window_available = 
2477       ntohl (ack->receive_window_remaining);
2478     /* Check if we have received all acknowledgements */
2479     need_retransmission = GNUNET_NO;
2480     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2481     {
2482       if (NULL == socket->write_handle->messages[packet]) break;
2483       if (GNUNET_YES != ackbitmap_is_bit_set 
2484           (&socket->write_handle->ack_bitmap,packet))
2485       {
2486         need_retransmission = GNUNET_YES;
2487         break;
2488       }
2489     }
2490     if (GNUNET_YES == need_retransmission)
2491     {
2492       write_data (socket);
2493     }
2494     else      /* We have to call the write continuation callback now */
2495     {
2496       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2497       
2498       /* Free the packets */
2499       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2500       {
2501         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2502       }
2503       write_handle = socket->write_handle;
2504       socket->write_handle = NULL;
2505       if (NULL != write_handle->write_cont)
2506         write_handle->write_cont (write_handle->write_cont_cls,
2507                                   socket->status,
2508                                   write_handle->size);
2509       /* We are done with the write handle - Freeing it */
2510       GNUNET_free (write_handle);
2511       LOG (GNUNET_ERROR_TYPE_DEBUG,
2512            "%s: Write completion callback completed\n",
2513            GNUNET_i2s (&socket->other_peer));      
2514     }
2515     break;
2516   default:
2517     break;
2518   }
2519   return GNUNET_OK;
2520 }
2521
2522
2523 /**
2524  * Handler for DATA_ACK messages
2525  *
2526  * @param cls the 'struct GNUNET_STREAM_Socket'
2527  * @param tunnel connection to the other end
2528  * @param tunnel_ctx unused
2529  * @param sender who sent the message
2530  * @param message the actual message
2531  * @param atsi performance data for the connection
2532  * @return GNUNET_OK to keep the connection open,
2533  *         GNUNET_SYSERR to close it (signal serious error)
2534  */
2535 static int
2536 client_handle_ack (void *cls,
2537                    struct GNUNET_MESH_Tunnel *tunnel,
2538                    void **tunnel_ctx,
2539                    const struct GNUNET_PeerIdentity *sender,
2540                    const struct GNUNET_MessageHeader *message,
2541                    const struct GNUNET_ATS_Information*atsi)
2542 {
2543   struct GNUNET_STREAM_Socket *socket = cls;
2544   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2545  
2546   return handle_ack (socket, tunnel, sender, ack, atsi);
2547 }
2548
2549
2550 /**
2551  * Handler for DATA_ACK messages
2552  *
2553  * @param cls the server's listen socket
2554  * @param tunnel connection to the other end
2555  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2556  * @param sender who sent the message
2557  * @param message the actual message
2558  * @param atsi performance data for the connection
2559  * @return GNUNET_OK to keep the connection open,
2560  *         GNUNET_SYSERR to close it (signal serious error)
2561  */
2562 static int
2563 server_handle_ack (void *cls,
2564                    struct GNUNET_MESH_Tunnel *tunnel,
2565                    void **tunnel_ctx,
2566                    const struct GNUNET_PeerIdentity *sender,
2567                    const struct GNUNET_MessageHeader *message,
2568                    const struct GNUNET_ATS_Information*atsi)
2569 {
2570   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2571   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2572  
2573   return handle_ack (socket, tunnel, sender, ack, atsi);
2574 }
2575
2576
2577 /**
2578  * For client message handlers, the stream socket is in the
2579  * closure argument.
2580  */
2581 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2582   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2583   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2584    sizeof (struct GNUNET_STREAM_AckMessage) },
2585   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2586    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2587   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2588    sizeof (struct GNUNET_STREAM_MessageHeader)},
2589   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2590    sizeof (struct GNUNET_STREAM_MessageHeader)},
2591   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2592    sizeof (struct GNUNET_STREAM_MessageHeader)},
2593   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2594    sizeof (struct GNUNET_STREAM_MessageHeader)},
2595   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2596    sizeof (struct GNUNET_STREAM_MessageHeader)},
2597   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2598    sizeof (struct GNUNET_STREAM_MessageHeader)},
2599   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2600    sizeof (struct GNUNET_STREAM_MessageHeader)},
2601   {NULL, 0, 0}
2602 };
2603
2604
2605 /**
2606  * For server message handlers, the stream socket is in the
2607  * tunnel context, and the listen socket in the closure argument.
2608  */
2609 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2610   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2611   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2612    sizeof (struct GNUNET_STREAM_AckMessage) },
2613   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2614    sizeof (struct GNUNET_STREAM_MessageHeader)},
2615   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2616    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2617   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2618    sizeof (struct GNUNET_STREAM_MessageHeader)},
2619   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2620    sizeof (struct GNUNET_STREAM_MessageHeader)},
2621   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2622    sizeof (struct GNUNET_STREAM_MessageHeader)},
2623   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2624    sizeof (struct GNUNET_STREAM_MessageHeader)},
2625   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2626    sizeof (struct GNUNET_STREAM_MessageHeader)},
2627   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2628    sizeof (struct GNUNET_STREAM_MessageHeader)},
2629   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2630    sizeof (struct GNUNET_STREAM_MessageHeader)},
2631   {NULL, 0, 0}
2632 };
2633
2634
2635 /**
2636  * Function called when our target peer is connected to our tunnel
2637  *
2638  * @param cls the socket for which this tunnel is created
2639  * @param peer the peer identity of the target
2640  * @param atsi performance data for the connection
2641  */
2642 static void
2643 mesh_peer_connect_callback (void *cls,
2644                             const struct GNUNET_PeerIdentity *peer,
2645                             const struct GNUNET_ATS_Information * atsi)
2646 {
2647   struct GNUNET_STREAM_Socket *socket = cls;
2648   struct GNUNET_STREAM_MessageHeader *message;
2649   
2650   if (0 != memcmp (peer,
2651                    &socket->other_peer,
2652                    sizeof (struct GNUNET_PeerIdentity)))
2653   {
2654     LOG (GNUNET_ERROR_TYPE_DEBUG,
2655          "%s: A peer which is not our target has connected to our tunnel\n",
2656          GNUNET_i2s(peer));
2657     return;
2658   }
2659   
2660   LOG (GNUNET_ERROR_TYPE_DEBUG,
2661        "%s: Target peer %s connected\n",
2662        GNUNET_i2s (&socket->other_peer),
2663        GNUNET_i2s (&socket->other_peer));
2664   
2665   /* Set state to INIT */
2666   socket->state = STATE_INIT;
2667
2668   /* Send HELLO message */
2669   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2670   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2671   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2672   queue_message (socket,
2673                  message,
2674                  &set_state_hello_wait,
2675                  NULL);
2676
2677   /* Call open callback */
2678   if (NULL == socket->open_cb)
2679   {
2680     LOG (GNUNET_ERROR_TYPE_DEBUG,
2681          "STREAM_open callback is NULL\n");
2682   }
2683 }
2684
2685
2686 /**
2687  * Function called when our target peer is disconnected from our tunnel
2688  *
2689  * @param cls the socket associated which this tunnel
2690  * @param peer the peer identity of the target
2691  */
2692 static void
2693 mesh_peer_disconnect_callback (void *cls,
2694                                const struct GNUNET_PeerIdentity *peer)
2695 {
2696   struct GNUNET_STREAM_Socket *socket=cls;
2697   
2698   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2699   LOG (GNUNET_ERROR_TYPE_DEBUG,
2700        "%s: Other peer %s disconnected \n",
2701        GNUNET_i2s (&socket->other_peer),
2702        GNUNET_i2s (&socket->other_peer));
2703 }
2704
2705
2706 /**
2707  * Method called whenever a peer creates a tunnel to us
2708  *
2709  * @param cls closure
2710  * @param tunnel new handle to the tunnel
2711  * @param initiator peer that started the tunnel
2712  * @param atsi performance information for the tunnel
2713  * @return initial tunnel context for the tunnel
2714  *         (can be NULL -- that's not an error)
2715  */
2716 static void *
2717 new_tunnel_notify (void *cls,
2718                    struct GNUNET_MESH_Tunnel *tunnel,
2719                    const struct GNUNET_PeerIdentity *initiator,
2720                    const struct GNUNET_ATS_Information *atsi)
2721 {
2722   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2723   struct GNUNET_STREAM_Socket *socket;
2724
2725   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2726      from the same peer again until the socket is closed */
2727
2728   if (GNUNET_NO == lsocket->listening)
2729   {
2730     LOG (GNUNET_ERROR_TYPE_DEBUG,
2731          "%s: Destroying tunnel from peer %s as we don't have the lock\n",
2732          GNUNET_i2s (&socket->other_peer),
2733          GNUNET_i2s (&socket->other_peer));
2734     GNUNET_MESH_tunnel_destroy (tunnel);
2735     return NULL;
2736   }
2737   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2738   socket->other_peer = *initiator;
2739   socket->tunnel = tunnel;
2740   socket->session_id = 0;       /* FIXME */
2741   socket->state = STATE_INIT;
2742   socket->lsocket = lsocket;
2743   socket->retransmit_timeout = lsocket->retransmit_timeout;
2744   socket->testing_active = lsocket->testing_active;
2745   socket->testing_set_write_sequence_number_value =
2746     lsocket->testing_set_write_sequence_number_value;
2747     
2748   LOG (GNUNET_ERROR_TYPE_DEBUG,
2749        "%s: Peer %s initiated tunnel to us\n", 
2750        GNUNET_i2s (&socket->other_peer),
2751        GNUNET_i2s (&socket->other_peer));
2752   
2753   /* FIXME: Copy MESH handle from lsocket to socket */
2754   
2755   return socket;
2756 }
2757
2758
2759 /**
2760  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2761  * any associated state.  This function is NOT called if the client has
2762  * explicitly asked for the tunnel to be destroyed using
2763  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2764  * the tunnel.
2765  *
2766  * @param cls closure (set from GNUNET_MESH_connect)
2767  * @param tunnel connection to the other end (henceforth invalid)
2768  * @param tunnel_ctx place where local state associated
2769  *                   with the tunnel is stored
2770  */
2771 static void 
2772 tunnel_cleaner (void *cls,
2773                 const struct GNUNET_MESH_Tunnel *tunnel,
2774                 void *tunnel_ctx)
2775 {
2776   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2777
2778   if (tunnel != socket->tunnel)
2779     return;
2780
2781   GNUNET_break_op(0);
2782   LOG (GNUNET_ERROR_TYPE_DEBUG,
2783        "%s: Peer %s has terminated connection abruptly\n",
2784        GNUNET_i2s (&socket->other_peer),
2785        GNUNET_i2s (&socket->other_peer));
2786
2787   socket->status = GNUNET_STREAM_SHUTDOWN;
2788
2789   /* Clear Transmit handles */
2790   if (NULL != socket->transmit_handle)
2791   {
2792     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2793     socket->transmit_handle = NULL;
2794   }
2795   if (NULL != socket->ack_transmit_handle)
2796   {
2797     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2798     GNUNET_free (socket->ack_msg);
2799     socket->ack_msg = NULL;
2800     socket->ack_transmit_handle = NULL;
2801   }
2802   /* Stop Tasks using socket->tunnel */
2803   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2804   {
2805     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2806     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2807   }
2808   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2809   {
2810     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2811     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2812   }
2813   /* FIXME: Cancel all other tasks using socket->tunnel */
2814   socket->tunnel = NULL;
2815 }
2816
2817
2818 /**
2819  * Callback to signal timeout on lockmanager lock acquire
2820  *
2821  * @param cls the ListenSocket
2822  * @param tc the scheduler task context
2823  */
2824 static void
2825 lockmanager_acquire_timeout (void *cls, 
2826                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2827 {
2828   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2829   GNUNET_STREAM_ListenCallback listen_cb;
2830   void *listen_cb_cls;
2831
2832   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2833   listen_cb = lsocket->listen_cb;
2834   listen_cb_cls = lsocket->listen_cb_cls;
2835   GNUNET_STREAM_listen_close (lsocket);
2836   if (NULL != listen_cb)
2837     listen_cb (listen_cb_cls, NULL, NULL);  
2838 }
2839
2840
2841 /**
2842  * Callback to notify us on the status changes on app_port lock
2843  *
2844  * @param cls the ListenSocket
2845  * @param domain the domain name of the lock
2846  * @param lock the app_port
2847  * @param status the current status of the lock
2848  */
2849 static void
2850 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2851                        enum GNUNET_LOCKMANAGER_Status status)
2852 {
2853   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2854
2855   GNUNET_assert (lock == (uint32_t) lsocket->port);
2856   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2857   {
2858     lsocket->listening = GNUNET_YES;
2859     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2860     {
2861       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2862       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2863     }
2864     if (NULL == lsocket->mesh)
2865     {
2866       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2867
2868       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2869                                            RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
2870                                            lsocket, /* Closure */
2871                                            &new_tunnel_notify,
2872                                            &tunnel_cleaner,
2873                                            server_message_handlers,
2874                                            ports);
2875       GNUNET_assert (NULL != lsocket->mesh);
2876     }
2877   }
2878   if (GNUNET_LOCKMANAGER_RELEASE == status)
2879     lsocket->listening = GNUNET_NO;
2880 }
2881
2882
2883 /*****************/
2884 /* API functions */
2885 /*****************/
2886
2887
2888 /**
2889  * Tries to open a stream to the target peer
2890  *
2891  * @param cfg configuration to use
2892  * @param target the target peer to which the stream has to be opened
2893  * @param app_port the application port number which uniquely identifies this
2894  *            stream
2895  * @param open_cb this function will be called after stream has be established 
2896  * @param open_cb_cls the closure for open_cb
2897  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2898  * @return if successful it returns the stream socket; NULL if stream cannot be
2899  *         opened 
2900  */
2901 struct GNUNET_STREAM_Socket *
2902 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2903                     const struct GNUNET_PeerIdentity *target,
2904                     GNUNET_MESH_ApplicationType app_port,
2905                     GNUNET_STREAM_OpenCallback open_cb,
2906                     void *open_cb_cls,
2907                     ...)
2908 {
2909   struct GNUNET_STREAM_Socket *socket;
2910   enum GNUNET_STREAM_Option option;
2911   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2912   va_list vargs;                /* Variable arguments */
2913
2914   LOG (GNUNET_ERROR_TYPE_DEBUG,
2915        "%s\n", __func__);
2916   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2917   socket->other_peer = *target;
2918   socket->open_cb = open_cb;
2919   socket->open_cls = open_cb_cls;
2920   /* Set defaults */
2921   socket->retransmit_timeout = 
2922     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2923   socket->testing_active = GNUNET_NO;
2924   va_start (vargs, open_cb_cls); /* Parse variable args */
2925   do {
2926     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2927     switch (option)
2928     {
2929     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2930       /* Expect struct GNUNET_TIME_Relative */
2931       socket->retransmit_timeout = va_arg (vargs,
2932                                            struct GNUNET_TIME_Relative);
2933       break;
2934     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2935       socket->testing_active = GNUNET_YES;
2936       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2937                                                                 uint32_t);
2938       break;
2939     case GNUNET_STREAM_OPTION_END:
2940       break;
2941     }
2942   } while (GNUNET_STREAM_OPTION_END != option);
2943   va_end (vargs);               /* End of variable args parsing */
2944   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2945                                       RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
2946                                       socket, /* cls */
2947                                       NULL, /* No inbound tunnel handler */
2948                                       NULL, /* No in-tunnel cleaner */
2949                                       client_message_handlers,
2950                                       ports); /* We don't get inbound tunnels */
2951   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2952   {
2953     GNUNET_free (socket);
2954     return NULL;
2955   }
2956
2957   /* Now create the mesh tunnel to target */
2958   LOG (GNUNET_ERROR_TYPE_DEBUG,
2959        "Creating MESH Tunnel\n");
2960   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2961                                               NULL, /* Tunnel context */
2962                                               &mesh_peer_connect_callback,
2963                                               &mesh_peer_disconnect_callback,
2964                                               socket);
2965   GNUNET_assert (NULL != socket->tunnel);
2966   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2967                                         &socket->other_peer);
2968   
2969   LOG (GNUNET_ERROR_TYPE_DEBUG,
2970        "%s() END\n", __func__);
2971   return socket;
2972 }
2973
2974
2975 /**
2976  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2977  *
2978  * @param socket the stream socket
2979  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2980  * @param completion_cb the callback that will be called upon successful
2981  *          shutdown of given operation
2982  * @param completion_cls the closure for the completion callback
2983  * @return the shutdown handle
2984  */
2985 struct GNUNET_STREAM_ShutdownHandle *
2986 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2987                         int operation,
2988                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2989                         void *completion_cls)
2990 {
2991   struct GNUNET_STREAM_ShutdownHandle *handle;
2992   struct GNUNET_STREAM_MessageHeader *msg;
2993   
2994   GNUNET_assert (NULL == socket->shutdown_handle);
2995
2996   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2997   handle->socket = socket;
2998   handle->completion_cb = completion_cb;
2999   handle->completion_cls = completion_cls;
3000   socket->shutdown_handle = handle;
3001
3002   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3003   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3004   switch (operation)
3005   {
3006   case SHUT_RD:
3007     handle->operation = SHUT_RD;
3008     if (NULL != socket->read_handle)
3009       LOG (GNUNET_ERROR_TYPE_WARNING,
3010            "Existing read handle should be cancelled before shutting"
3011            " down reading\n");
3012     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3013     queue_message (socket,
3014                    msg,
3015                    &set_state_receive_close_wait,
3016                    NULL);
3017     break;
3018   case SHUT_WR:
3019     handle->operation = SHUT_WR;
3020     if (NULL != socket->write_handle)
3021       LOG (GNUNET_ERROR_TYPE_WARNING,
3022            "Existing write handle should be cancelled before shutting"
3023            " down writing\n");
3024     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3025     queue_message (socket,
3026                    msg,
3027                    &set_state_transmit_close_wait,
3028                    NULL);
3029     break;
3030   case SHUT_RDWR:
3031     handle->operation = SHUT_RDWR;
3032     if (NULL != socket->write_handle)
3033       LOG (GNUNET_ERROR_TYPE_WARNING,
3034            "Existing write handle should be cancelled before shutting"
3035            " down writing\n");
3036     if (NULL != socket->read_handle)
3037       LOG (GNUNET_ERROR_TYPE_WARNING,
3038            "Existing read handle should be cancelled before shutting"
3039            " down reading\n");
3040     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3041     queue_message (socket,
3042                    msg,
3043                    &set_state_close_wait,
3044                    NULL);
3045     break;
3046   default:
3047     LOG (GNUNET_ERROR_TYPE_WARNING,
3048          "GNUNET_STREAM_shutdown called with invalid value for "
3049          "parameter operation -- Ignoring\n");
3050     GNUNET_free (msg);
3051     GNUNET_free (handle);
3052     return NULL;
3053   }
3054   handle->close_msg_retransmission_task_id =
3055     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3056                                   &close_msg_retransmission_task,
3057                                   handle);
3058   return handle;
3059 }
3060
3061
3062 /**
3063  * Cancels a pending shutdown
3064  *
3065  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3066  */
3067 void
3068 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3069 {
3070   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3071     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3072   GNUNET_free (handle);
3073 }
3074
3075
3076 /**
3077  * Closes the stream
3078  *
3079  * @param socket the stream socket
3080  */
3081 void
3082 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3083 {
3084   struct MessageQueue *head;
3085
3086   if (NULL != socket->read_handle)
3087   {
3088     LOG (GNUNET_ERROR_TYPE_WARNING,
3089          "Closing STREAM socket when a read handle is pending\n");
3090   }
3091   if (NULL != socket->write_handle)
3092   {
3093     LOG (GNUNET_ERROR_TYPE_WARNING,
3094          "Closing STREAM socket when a write handle is pending\n");
3095     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3096     //socket->write_handle = NULL;
3097   }
3098
3099   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3100   {
3101     /* socket closed with read task pending!? */
3102     GNUNET_break (0);
3103     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3104     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3105   }
3106   
3107   /* Terminate the ack'ing tasks if they are still present */
3108   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3109   {
3110     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3111     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3112   }
3113
3114   /* Clear Transmit handles */
3115   if (NULL != socket->transmit_handle)
3116   {
3117     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3118     socket->transmit_handle = NULL;
3119   }
3120   if (NULL != socket->ack_transmit_handle)
3121   {
3122     GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3123     GNUNET_free (socket->ack_msg);
3124     socket->ack_msg = NULL;
3125     socket->ack_transmit_handle = NULL;
3126   }
3127
3128   /* Clear existing message queue */
3129   while (NULL != (head = socket->queue_head)) {
3130     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3131                                  socket->queue_tail,
3132                                  head);
3133     GNUNET_free (head->message);
3134     GNUNET_free (head);
3135   }
3136
3137   /* Close associated tunnel */
3138   if (NULL != socket->tunnel)
3139   {
3140     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3141     socket->tunnel = NULL;
3142   }
3143
3144   /* Close mesh connection */
3145   if (NULL != socket->mesh && NULL == socket->lsocket)
3146   {
3147     GNUNET_MESH_disconnect (socket->mesh);
3148     socket->mesh = NULL;
3149   }
3150   
3151   /* Release receive buffer */
3152   if (NULL != socket->receive_buffer)
3153   {
3154     GNUNET_free (socket->receive_buffer);
3155   }
3156
3157   GNUNET_free (socket);
3158 }
3159
3160
3161 /**
3162  * Listens for stream connections for a specific application ports
3163  *
3164  * @param cfg the configuration to use
3165  * @param app_port the application port for which new streams will be accepted
3166  * @param listen_cb this function will be called when a peer tries to establish
3167  *            a stream with us
3168  * @param listen_cb_cls closure for listen_cb
3169  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3170  * @return listen socket, NULL for any error
3171  */
3172 struct GNUNET_STREAM_ListenSocket *
3173 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3174                       GNUNET_MESH_ApplicationType app_port,
3175                       GNUNET_STREAM_ListenCallback listen_cb,
3176                       void *listen_cb_cls,
3177                       ...)
3178 {
3179   /* FIXME: Add variable args for passing configration options? */
3180   struct GNUNET_STREAM_ListenSocket *lsocket;
3181   enum GNUNET_STREAM_Option option;
3182   va_list vargs;
3183
3184   GNUNET_assert (NULL != listen_cb);
3185   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3186   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3187   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3188   if (NULL == lsocket->lockmanager)
3189   {
3190     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3191     GNUNET_free (lsocket);
3192     return NULL;
3193   }
3194   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3195   /* Set defaults */
3196   lsocket->retransmit_timeout = 
3197     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3198   lsocket->testing_active = GNUNET_NO;
3199   va_start (vargs, listen_cb_cls);
3200   do {
3201     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3202     switch (option)
3203     {
3204     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3205       lsocket->retransmit_timeout = va_arg (vargs,
3206                                             struct GNUNET_TIME_Relative);
3207       break;
3208     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3209       lsocket->testing_active = GNUNET_YES;
3210       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3211                                                                  uint32_t);
3212       break;
3213     case GNUNET_STREAM_OPTION_END:
3214       break;
3215     }
3216   } while (GNUNET_STREAM_OPTION_END != option);
3217   va_end (vargs);
3218   lsocket->port = app_port;
3219   lsocket->listen_cb = listen_cb;
3220   lsocket->listen_cb_cls = listen_cb_cls;
3221   lsocket->locking_request = 
3222     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3223                                      (uint32_t) lsocket->port,
3224                                      &lock_status_change_cb, lsocket);
3225   lsocket->lockmanager_acquire_timeout_task =
3226     GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS (20),
3227                                   &lockmanager_acquire_timeout, lsocket);
3228   return lsocket;
3229 }
3230
3231
3232 /**
3233  * Closes the listen socket
3234  *
3235  * @param lsocket the listen socket
3236  */
3237 void
3238 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3239 {
3240   /* Close MESH connection */
3241   if (NULL != lsocket->mesh)
3242     GNUNET_MESH_disconnect (lsocket->mesh);
3243   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3244   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3245     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3246   if (NULL != lsocket->locking_request)
3247     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3248   if (NULL != lsocket->lockmanager)
3249     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3250   GNUNET_free (lsocket);
3251 }
3252
3253
3254 /**
3255  * Tries to write the given data to the stream. The maximum size of data that
3256  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3257  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3258  * violation, however only the said number of maximum bytes will be written.
3259  *
3260  * @param socket the socket representing a stream
3261  * @param data the data buffer from where the data is written into the stream
3262  * @param size the number of bytes to be written from the data buffer
3263  * @param timeout the timeout period
3264  * @param write_cont the function to call upon writing some bytes into the
3265  *          stream 
3266  * @param write_cont_cls the closure
3267  *
3268  * @return handle to cancel the operation; if a previous write is pending or
3269  *           the stream has been shutdown for this operation then write_cont is
3270  *           immediately called and NULL is returned.
3271  */
3272 struct GNUNET_STREAM_IOWriteHandle *
3273 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3274                      const void *data,
3275                      size_t size,
3276                      struct GNUNET_TIME_Relative timeout,
3277                      GNUNET_STREAM_CompletionContinuation write_cont,
3278                      void *write_cont_cls)
3279 {
3280   unsigned int num_needed_packets;
3281   unsigned int packet;
3282   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3283   uint32_t packet_size;
3284   uint32_t payload_size;
3285   struct GNUNET_STREAM_DataMessage *data_msg;
3286   const void *sweep;
3287   struct GNUNET_TIME_Relative ack_deadline;
3288
3289   LOG (GNUNET_ERROR_TYPE_DEBUG,
3290        "%s\n", __func__);
3291
3292   /* Return NULL if there is already a write request pending */
3293   if (NULL != socket->write_handle)
3294   {
3295     GNUNET_break (0);
3296     return NULL;
3297   }
3298
3299   switch (socket->state)
3300   {
3301   case STATE_TRANSMIT_CLOSED:
3302   case STATE_TRANSMIT_CLOSE_WAIT:
3303   case STATE_CLOSED:
3304   case STATE_CLOSE_WAIT:
3305     if (NULL != write_cont)
3306       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3307     LOG (GNUNET_ERROR_TYPE_DEBUG,
3308          "%s() END\n", __func__);
3309     return NULL;
3310   case STATE_INIT:
3311   case STATE_LISTEN:
3312   case STATE_HELLO_WAIT:
3313     if (NULL != write_cont)
3314       /* FIXME: GNUNET_STREAM_SYSERR?? */
3315       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3316     LOG (GNUNET_ERROR_TYPE_DEBUG,
3317          "%s() END\n", __func__);
3318     return NULL;
3319   case STATE_ESTABLISHED:
3320   case STATE_RECEIVE_CLOSED:
3321   case STATE_RECEIVE_CLOSE_WAIT:
3322     break;
3323   }
3324
3325   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3326     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
3327   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
3328   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3329   io_handle->socket = socket;
3330   io_handle->write_cont = write_cont;
3331   io_handle->write_cont_cls = write_cont_cls;
3332   io_handle->size = size;
3333   sweep = data;
3334   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3335      determined from RTT */
3336   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3337   /* Divide the given buffer into packets for sending */
3338   for (packet=0; packet < num_needed_packets; packet++)
3339   {
3340     if ((packet + 1) * max_payload_size < size) 
3341     {
3342       payload_size = max_payload_size;
3343       packet_size = MAX_PACKET_SIZE;
3344     }
3345     else 
3346     {
3347       payload_size = size - packet * max_payload_size;
3348       packet_size =  payload_size + sizeof (struct
3349                                             GNUNET_STREAM_DataMessage); 
3350     }
3351     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3352     io_handle->messages[packet]->header.header.size = htons (packet_size);
3353     io_handle->messages[packet]->header.header.type =
3354       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3355     io_handle->messages[packet]->sequence_number =
3356       htonl (socket->write_sequence_number++);
3357     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3358
3359     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3360        determined from RTT */
3361     io_handle->messages[packet]->ack_deadline =
3362       GNUNET_TIME_relative_hton (ack_deadline);
3363     data_msg = io_handle->messages[packet];
3364     /* Copy data from given buffer to the packet */
3365     memcpy (&data_msg[1],
3366             sweep,
3367             payload_size);
3368     sweep += payload_size;
3369     socket->write_offset += payload_size;
3370   }
3371   socket->write_handle = io_handle;
3372   write_data (socket);
3373
3374   LOG (GNUNET_ERROR_TYPE_DEBUG,
3375        "%s() END\n", __func__);
3376
3377   return io_handle;
3378 }
3379
3380
3381 /**
3382  * Tries to read data from the stream.
3383  *
3384  * @param socket the socket representing a stream
3385  * @param timeout the timeout period
3386  * @param proc function to call with data (once only)
3387  * @param proc_cls the closure for proc
3388  *
3389  * @return handle to cancel the operation; if the stream has been shutdown for
3390  *           this type of opeartion then the DataProcessor is immediately
3391  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3392  */
3393 struct GNUNET_STREAM_IOReadHandle *
3394 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3395                     struct GNUNET_TIME_Relative timeout,
3396                     GNUNET_STREAM_DataProcessor proc,
3397                     void *proc_cls)
3398 {
3399   struct GNUNET_STREAM_IOReadHandle *read_handle;
3400   
3401   LOG (GNUNET_ERROR_TYPE_DEBUG,
3402        "%s: %s()\n", 
3403        GNUNET_i2s (&socket->other_peer),
3404        __func__);
3405
3406   /* Return NULL if there is already a read handle; the user has to cancel that
3407      first before continuing or has to wait until it is completed */
3408   if (NULL != socket->read_handle) return NULL;
3409
3410   GNUNET_assert (NULL != proc);
3411
3412   switch (socket->state)
3413   {
3414   case STATE_RECEIVE_CLOSED:
3415   case STATE_RECEIVE_CLOSE_WAIT:
3416   case STATE_CLOSED:
3417   case STATE_CLOSE_WAIT:
3418     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3419     LOG (GNUNET_ERROR_TYPE_DEBUG,
3420          "%s: %s() END\n",
3421          GNUNET_i2s (&socket->other_peer),
3422          __func__);
3423     return NULL;
3424   default:
3425     break;
3426   }
3427
3428   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3429   read_handle->proc = proc;
3430   read_handle->proc_cls = proc_cls;
3431   socket->read_handle = read_handle;
3432
3433   /* Check if we have a packet at bitmap 0 */
3434   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3435                                           0))
3436   {
3437     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3438                                                      socket);
3439    
3440   }
3441   
3442   /* Setup the read timeout task */
3443   socket->read_io_timeout_task_id =
3444     GNUNET_SCHEDULER_add_delayed (timeout,
3445                                   &read_io_timeout,
3446                                   socket);
3447   LOG (GNUNET_ERROR_TYPE_DEBUG,
3448        "%s: %s() END\n",
3449        GNUNET_i2s (&socket->other_peer),
3450        __func__);
3451   return read_handle;
3452 }
3453
3454
3455 /**
3456  * Cancel pending write operation.
3457  *
3458  * @param ioh handle to operation to cancel
3459  */
3460 void
3461 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3462 {
3463   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3464   unsigned int packet;
3465
3466   GNUNET_assert (NULL != socket->write_handle);
3467   GNUNET_assert (socket->write_handle == ioh);
3468
3469   if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3470   {
3471     GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3472     socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3473   }
3474
3475   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3476   {
3477     if (NULL == ioh->messages[packet]) break;
3478     GNUNET_free (ioh->messages[packet]);
3479   }
3480       
3481   GNUNET_free (socket->write_handle);
3482   socket->write_handle = NULL;
3483 }
3484
3485
3486 /**
3487  * Cancel pending read operation.
3488  *
3489  * @param ioh handle to operation to cancel
3490  */
3491 void
3492 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3493 {
3494   // FIXME: do stuff
3495 }
3496
3497 /* end of stream_api.c */