- warn upon unclean socket close
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_statistics_service.h"
43 #include "gnunet_stream_lib.h"
44 #include "stream_protocol.h"
45
46 /**
47  * Generic logging shorthand
48  */
49 #define LOG(kind,...)                                   \
50   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
51
52 /**
53  * Debug logging shorthand
54  */
55 #define LOG_DEBUG(...)                          \
56   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
57
58 /**
59  * Time in relative seconds shorthand
60  */
61 #define TIME_REL_SECS(sec) \
62   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
63
64 /**
65  * The maximum packet size of a stream packet
66  */
67 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
68
69 /**
70  * Receive buffer
71  */
72 #define RECEIVE_BUFFER_SIZE 4096000
73
74 /**
75  * states in the Protocol
76  */
77 enum State
78   {
79     /**
80      * Client initialization state
81      */
82     STATE_INIT,
83
84     /**
85      * Listener initialization state 
86      */
87     STATE_LISTEN,
88
89     /**
90      * Pre-connection establishment state
91      */
92     STATE_HELLO_WAIT,
93
94     /**
95      * State where a connection has been established
96      */
97     STATE_ESTABLISHED,
98
99     /**
100      * State where the socket is closed on our side and waiting to be ACK'ed
101      */
102     STATE_RECEIVE_CLOSE_WAIT,
103
104     /**
105      * State where the socket is closed for reading
106      */
107     STATE_RECEIVE_CLOSED,
108
109     /**
110      * State where the socket is closed on our side and waiting to be ACK'ed
111      */
112     STATE_TRANSMIT_CLOSE_WAIT,
113
114     /**
115      * State where the socket is closed for writing
116      */
117     STATE_TRANSMIT_CLOSED,
118
119     /**
120      * State where the socket is closed on our side and waiting to be ACK'ed
121      */
122     STATE_CLOSE_WAIT,
123
124     /**
125      * State where the socket is closed
126      */
127     STATE_CLOSED 
128   };
129
130
131 /**
132  * Functions of this type are called when a message is written
133  *
134  * @param cls the closure from queue_message
135  * @param socket the socket the written message was bound to
136  */
137 typedef void (*SendFinishCallback) (void *cls,
138                                     struct GNUNET_STREAM_Socket *socket);
139
140
141 /**
142  * The send message queue
143  */
144 struct MessageQueue
145 {
146   /**
147    * The message
148    */
149   struct GNUNET_STREAM_MessageHeader *message;
150
151   /**
152    * Callback to be called when the message is sent
153    */
154   SendFinishCallback finish_cb;
155
156   /**
157    * The closure for finish_cb
158    */
159   void *finish_cb_cls;
160
161   /**
162    * The next message in queue. Should be NULL in the last message
163    */
164   struct MessageQueue *next;
165
166   /**
167    * The next message in queue. Should be NULL in the first message
168    */
169   struct MessageQueue *prev;
170 };
171
172
173 /**
174  * The STREAM Socket Handler
175  */
176 struct GNUNET_STREAM_Socket
177 {
178   /**
179    * The mesh handle
180    */
181   struct GNUNET_MESH_Handle *mesh;
182
183   /**
184    * Handle to statistics
185    */
186   struct GNUNET_STATISTICS_Handle *stat_handle;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * The shutdown handle associated with this socket
230    */
231   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
232
233   /**
234    * Buffer for storing received messages
235    */
236   void *receive_buffer;
237
238   /**
239    * The listen socket from which this socket is derived. Should be NULL if it
240    * is not a derived socket
241    */
242   struct GNUNET_STREAM_ListenSocket *lsocket;
243
244   /**
245    * The peer identity of the peer at the other end of the stream
246    */
247   struct GNUNET_PeerIdentity other_peer;
248
249   /**
250    * The Acknowledgement Bitmap
251    */
252   GNUNET_STREAM_AckBitmap ack_bitmap;
253
254   /**
255    * Task identifier for retransmission task after timeout
256    */
257   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
258
259   /**
260    * Task identifier for retransmission of control messages
261    */
262   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
263
264   /**
265    * The task for sending timely Acks
266    */
267   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
268
269   /**
270    * Retransmission timeout
271    */
272   struct GNUNET_TIME_Relative retransmit_timeout;
273
274   /**
275    * Time when the Acknowledgement was queued
276    */
277   struct GNUNET_TIME_Absolute ack_time_registered;
278
279   /**
280    * Queued Acknowledgement deadline
281    */
282   struct GNUNET_TIME_Relative ack_time_deadline;
283
284   /**
285    * The state of the protocol associated with this socket
286    */
287   enum State state;
288
289   /**
290    * The status of the socket
291    */
292   enum GNUNET_STREAM_Status status;
293
294   /**
295    * The number of previous timeouts; FIXME: currently not used
296    */
297   unsigned int retries;
298
299   /**
300    * Whether testing mode is active or not
301    */
302   int testing_active;
303
304   /**
305    * Is receive closed
306    */
307   int receive_closed;
308
309   /**
310    * Is transmission closed
311    */
312   int transmit_closed;
313
314   /**
315    * The application port number (type: uint32_t)
316    */
317   GNUNET_MESH_ApplicationType app_port;
318
319   /**
320    * The write sequence number to be set incase of testing
321    */
322   uint32_t testing_set_write_sequence_number_value;
323
324   /**
325    * Write sequence number. Set to random when sending HELLO(client) and
326    * HELLO_ACK(server) 
327    */
328   uint32_t write_sequence_number;
329
330   /**
331    * Read sequence number. This number's value is determined during handshake
332    */
333   uint32_t read_sequence_number;
334
335   /**
336    * The receiver buffer size
337    */
338   uint32_t receive_buffer_size;
339
340   /**
341    * The receiver buffer boundaries
342    */
343   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
344
345   /**
346    * receiver's available buffer after the last acknowledged packet
347    */
348   uint32_t receiver_window_available;
349
350   /**
351    * The offset pointer used during write operation
352    */
353   uint32_t write_offset;
354
355   /**
356    * The offset after which we are expecting data
357    */
358   uint32_t read_offset;
359
360   /**
361    * The offset upto which user has read from the received buffer
362    */
363   uint32_t copy_offset;
364
365   /**
366    * The maximum size of the data message payload this stream handle can send
367    */
368   uint16_t max_payload_size;
369 };
370
371
372 /**
373  * A socket for listening
374  */
375 struct GNUNET_STREAM_ListenSocket
376 {
377   /**
378    * The mesh handle
379    */
380   struct GNUNET_MESH_Handle *mesh;
381
382   /**
383    * Handle to statistics
384    */
385   struct GNUNET_STATISTICS_Handle *stat_handle;
386
387   /**
388    * Our configuration
389    */
390   struct GNUNET_CONFIGURATION_Handle *cfg;
391
392   /**
393    * Handle to the lock manager service
394    */
395   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
396
397   /**
398    * The active LockingRequest from lockmanager
399    */
400   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
401
402   /**
403    * Callback to call after acquring a lock and listening
404    */
405   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
406
407   /**
408    * The callback function which is called after successful opening socket
409    */
410   GNUNET_STREAM_ListenCallback listen_cb;
411
412   /**
413    * The call back closure
414    */
415   void *listen_cb_cls;
416
417   /**
418    * The service port
419    */
420   GNUNET_MESH_ApplicationType port;
421   
422   /**
423    * The id of the lockmanager timeout task
424    */
425   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
426
427   /**
428    * The retransmit timeout
429    */
430   struct GNUNET_TIME_Relative retransmit_timeout;
431   
432   /**
433    * Listen enabled?
434    */
435   int listening;
436
437   /**
438    * Whether testing mode is active or not
439    */
440   int testing_active;
441
442   /**
443    * The write sequence number to be set incase of testing
444    */
445   uint32_t testing_set_write_sequence_number_value;
446
447   /**
448    * The maximum size of the data message payload this stream handle can send
449    */
450   uint16_t max_payload_size;
451
452 };
453
454
455 /**
456  * The IO Write Handle
457  */
458 struct GNUNET_STREAM_IOWriteHandle
459 {
460   /**
461    * The socket to which this write handle is associated
462    */
463   struct GNUNET_STREAM_Socket *socket;
464
465   /**
466    * The packet_buffers associated with this Handle
467    */
468   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
469
470   /**
471    * The write continuation callback
472    */
473   GNUNET_STREAM_CompletionContinuation write_cont;
474
475   /**
476    * Write continuation closure
477    */
478   void *write_cont_cls;
479
480   /**
481    * The bitmap of this IOHandle; Corresponding bit for a message is set when
482    * it has been acknowledged by the receiver
483    */
484   GNUNET_STREAM_AckBitmap ack_bitmap;
485
486   /**
487    * Number of bytes in this write handle
488    */
489   size_t size;
490
491   /**
492    * Number of packets already transmitted from this IO handle. Retransmitted
493    * packets are not taken into account here. This is used to determine which
494    * packets account for retransmission and which packets occupy buffer space at
495    * the receiver.
496    */
497   unsigned int packets_sent;
498 };
499
500
501 /**
502  * The IO Read Handle
503  */
504 struct GNUNET_STREAM_IOReadHandle
505 {
506   /**
507    * The socket to which this read handle is associated
508    */
509   struct GNUNET_STREAM_Socket *socket;
510   
511   /**
512    * Callback for the read processor
513    */
514   GNUNET_STREAM_DataProcessor proc;
515
516   /**
517    * The closure pointer for the read processor callback
518    */
519   void *proc_cls;
520
521   /**
522    * Task identifier for the read io timeout task
523    */
524   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
525
526   /**
527    * Task scheduled to continue a read operation.
528    */
529   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
530 };
531
532
533 /**
534  * Handle for Shutdown
535  */
536 struct GNUNET_STREAM_ShutdownHandle
537 {
538   /**
539    * The socket associated with this shutdown handle
540    */
541   struct GNUNET_STREAM_Socket *socket;
542
543   /**
544    * Shutdown completion callback
545    */
546   GNUNET_STREAM_ShutdownCompletion completion_cb;
547
548   /**
549    * Closure for completion callback
550    */
551   void *completion_cls;
552
553   /**
554    * Close message retransmission task id
555    */
556   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
557
558   /**
559    * Task scheduled to call the shutdown continuation callback
560    */
561   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
562
563   /**
564    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
565    */
566   int operation;  
567 };
568
569
570 /**
571  * Default value in seconds for various timeouts
572  */
573 static const unsigned int default_timeout = 10;
574
575 /**
576  * The domain name for locks we use here
577  */
578 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
579
580
581 /**
582  * Callback function for sending queued message
583  *
584  * @param cls closure the socket
585  * @param size number of bytes available in buf
586  * @param buf where the callee should write the message
587  * @return number of bytes written to buf
588  */
589 static size_t
590 send_message_notify (void *cls, size_t size, void *buf)
591 {
592   struct GNUNET_STREAM_Socket *socket = cls;
593   struct MessageQueue *head;
594   size_t ret;
595
596   socket->transmit_handle = NULL; /* Remove the transmit handle */
597   head = socket->queue_head;
598   if (NULL == head)
599     return 0; /* just to be safe */
600   if (0 == size)                /* request timed out */
601   {
602     socket->retries++;
603     LOG (GNUNET_ERROR_TYPE_DEBUG,
604          "%s: Message sending timed out. Retry %d \n",
605          GNUNET_i2s (&socket->other_peer),
606          socket->retries);
607     socket->transmit_handle = 
608       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
609                                          GNUNET_NO, /* Corking */
610                                          /* FIXME: exponential backoff */
611                                          socket->retransmit_timeout,
612                                          &socket->other_peer,
613                                          ntohs (head->message->header.size),
614                                          &send_message_notify,
615                                          socket);
616     return 0;
617   }
618   ret = ntohs (head->message->header.size);
619   GNUNET_assert (size >= ret);
620   memcpy (buf, head->message, ret);
621   if (NULL != head->finish_cb)
622   {
623     head->finish_cb (head->finish_cb_cls, socket);
624   }
625   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
626                                socket->queue_tail,
627                                head);
628   GNUNET_free (head->message);
629   GNUNET_free (head);
630   head = socket->queue_head;
631   if (NULL != head)    /* more pending messages to send */
632   {
633     socket->retries = 0;
634     socket->transmit_handle = 
635       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
636                                          GNUNET_NO, /* Corking */
637                                          /* FIXME: exponential backoff */
638                                          socket->retransmit_timeout,
639                                          &socket->other_peer,
640                                          ntohs (head->message->header.size),
641                                          &send_message_notify,
642                                          socket);
643   }
644   return ret;
645 }
646
647
648 /**
649  * Queues a message for sending using the mesh connection of a socket
650  *
651  * @param socket the socket whose mesh connection is used
652  * @param message the message to be sent
653  * @param finish_cb the callback to be called when the message is sent
654  * @param finish_cb_cls the closure for the callback
655  * @param urgent set to GNUNET_YES to add the message to the beginning of the
656  *          queue; GNUNET_NO to add at the tail
657  */
658 static void
659 queue_message (struct GNUNET_STREAM_Socket *socket,
660                struct GNUNET_STREAM_MessageHeader *message,
661                SendFinishCallback finish_cb,
662                void *finish_cb_cls,
663                int urgent)
664 {
665   struct MessageQueue *queue_entity;
666
667   GNUNET_assert 
668     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
669      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
670   LOG (GNUNET_ERROR_TYPE_DEBUG,
671        "%s: Queueing message of type %d and size %d\n",
672        GNUNET_i2s (&socket->other_peer),
673        ntohs (message->header.type),
674        ntohs (message->header.size));
675   GNUNET_assert (NULL != message);
676   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
677   queue_entity->message = message;
678   queue_entity->finish_cb = finish_cb;
679   queue_entity->finish_cb_cls = finish_cb_cls;
680   if (GNUNET_YES == urgent)
681   {
682     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
683                                  queue_entity);
684     if (NULL != socket->transmit_handle)
685     {
686       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
687       socket->transmit_handle = NULL;
688     }
689   }
690   else
691     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
692                                       socket->queue_tail,
693                                       queue_entity);
694   if (NULL == socket->transmit_handle)
695   {
696     socket->retries = 0;
697     socket->transmit_handle = 
698         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
699                                            GNUNET_NO, /* Corking */
700                                            socket->retransmit_timeout,
701                                            &socket->other_peer,
702                                            ntohs (message->header.size),
703                                            &send_message_notify,
704                                            socket);
705   }
706 }
707
708
709 /**
710  * Copies a message and queues it for sending using the mesh connection of
711  * given socket 
712  *
713  * @param socket the socket whose mesh connection is used
714  * @param message the message to be sent
715  * @param finish_cb the callback to be called when the message is sent
716  * @param finish_cb_cls the closure for the callback
717  */
718 static void
719 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
720                         const struct GNUNET_STREAM_MessageHeader *message,
721                         SendFinishCallback finish_cb,
722                         void *finish_cb_cls)
723 {
724   struct GNUNET_STREAM_MessageHeader *msg_copy;
725   uint16_t size;
726   
727   size = ntohs (message->header.size);
728   msg_copy = GNUNET_malloc (size);
729   memcpy (msg_copy, message, size);
730   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
731 }
732
733
734 /**
735  * Writes data using the given socket. The amount of data written is limited by
736  * the receiver_window_size
737  *
738  * @param socket the socket to use
739  */
740 static void 
741 write_data (struct GNUNET_STREAM_Socket *socket);
742
743
744 /**
745  * Task for retransmitting data messages if they aren't ACK before their ack
746  * deadline 
747  *
748  * @param cls the socket
749  * @param tc the Task context
750  */
751 static void
752 data_retransmission_task (void *cls,
753                           const struct GNUNET_SCHEDULER_TaskContext *tc)
754 {
755   struct GNUNET_STREAM_Socket *socket = cls;
756   
757   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
758   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
759     return;
760   LOG (GNUNET_ERROR_TYPE_DEBUG,
761        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
762   write_data (socket);
763 }
764
765
766 /**
767  * Task for sending ACK message
768  *
769  * @param cls the socket
770  * @param tc the Task context
771  */
772 static void
773 ack_task (void *cls,
774           const struct GNUNET_SCHEDULER_TaskContext *tc)
775 {
776   struct GNUNET_STREAM_Socket *socket = cls;
777   struct GNUNET_STREAM_AckMessage *ack_msg;
778
779   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
780   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
781     return;
782   /* Create the ACK Message */
783   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
784   ack_msg->header.header.size = htons (sizeof (struct 
785                                                GNUNET_STREAM_AckMessage));
786   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
787   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
788   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
789   ack_msg->receive_window_remaining = 
790     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
791   /* Queue up ACK for immediate sending */
792   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
793 }
794
795
796 /**
797  * Retransmission task for shutdown messages
798  *
799  * @param cls the shutdown handle
800  * @param tc the Task Context
801  */
802 static void
803 close_msg_retransmission_task (void *cls,
804                                const struct GNUNET_SCHEDULER_TaskContext *tc)
805 {
806   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
807   struct GNUNET_STREAM_MessageHeader *msg;
808   struct GNUNET_STREAM_Socket *socket;
809
810   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
811   GNUNET_assert (NULL != shutdown_handle);
812   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
813     return;
814   socket = shutdown_handle->socket;
815   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
816   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
817   switch (shutdown_handle->operation)
818   {
819   case SHUT_RDWR:
820     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
821     break;
822   case SHUT_RD:
823     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
824     break;
825   case SHUT_WR:
826     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
827     break;
828   default:
829     GNUNET_free (msg);
830     shutdown_handle->close_msg_retransmission_task_id = 
831       GNUNET_SCHEDULER_NO_TASK;
832     return;
833   }
834   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
835   shutdown_handle->close_msg_retransmission_task_id =
836     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
837                                   &close_msg_retransmission_task,
838                                   shutdown_handle);
839 }
840
841
842 /**
843  * Function to modify a bit in GNUNET_STREAM_AckBitmap
844  *
845  * @param bitmap the bitmap to modify
846  * @param bit the bit number to modify
847  * @param value GNUNET_YES to on, GNUNET_NO to off
848  */
849 static void
850 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
851                       unsigned int bit, 
852                       int value)
853 {
854   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
855   if (GNUNET_YES == value)
856     *bitmap |= (1LL << bit);
857   else
858     *bitmap &= ~(1LL << bit);
859 }
860
861
862 /**
863  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
864  *
865  * @param bitmap address of the bitmap that has to be checked
866  * @param bit the bit number to check
867  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
868  */
869 static uint8_t
870 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
871                       unsigned int bit)
872 {
873   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
874   return 0 != (*bitmap & (1LL << bit));
875 }
876
877
878 /**
879  * Writes data using the given socket. The amount of data written is limited by
880  * the receiver_window_size
881  *
882  * @param socket the socket to use
883  */
884 static void 
885 write_data (struct GNUNET_STREAM_Socket *socket)
886 {
887   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
888   unsigned int packet;
889   
890   for (packet=0; packet < io_handle->packets_sent; packet++)
891   {
892     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
893                                            packet))
894     {
895       LOG (GNUNET_ERROR_TYPE_DEBUG,
896            "%s: Retransmitting DATA message with sequence %u\n",
897            GNUNET_i2s (&socket->other_peer),
898            ntohl (io_handle->messages[packet]->sequence_number));
899       copy_and_queue_message (socket,
900                               &io_handle->messages[packet]->header,
901                               NULL,
902                               NULL);
903     }
904   }
905   /* Now send new packets if there is enough buffer space */
906   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
907          (NULL != io_handle->messages[packet]) &&
908          (socket->receiver_window_available 
909           >= ntohs (io_handle->messages[packet]->header.header.size)))
910   {
911     socket->receiver_window_available -= 
912       ntohs (io_handle->messages[packet]->header.header.size);
913     LOG (GNUNET_ERROR_TYPE_DEBUG,
914          "%s: Placing DATA message with sequence %u in send queue\n",
915          GNUNET_i2s (&socket->other_peer),
916          ntohl (io_handle->messages[packet]->sequence_number));
917     copy_and_queue_message (socket,
918                             &io_handle->messages[packet]->header,
919                             NULL,
920                             NULL);
921     packet++;
922   }
923   io_handle->packets_sent = packet;
924   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
925   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
926     socket->data_retransmission_task_id = 
927       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
928                                     (GNUNET_TIME_UNIT_SECONDS, 8),
929                                     &data_retransmission_task,
930                                     socket);
931 }
932
933
934 /**
935  * Task for calling the read processor
936  *
937  * @param cls the socket
938  * @param tc the task context
939  */
940 static void
941 call_read_processor (void *cls,
942                      const struct GNUNET_SCHEDULER_TaskContext *tc)
943 {
944   struct GNUNET_STREAM_Socket *socket = cls;
945   struct GNUNET_STREAM_IOReadHandle *read_handle;
946   size_t read_size;
947   size_t valid_read_size;
948   unsigned int packet;
949   uint32_t sequence_increase;
950   uint32_t offset_increase;
951
952   read_handle = socket->read_handle;
953   GNUNET_assert (NULL != read_handle);
954   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
955   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
956     return;
957   if (NULL == socket->receive_buffer) 
958     return;
959   GNUNET_assert (NULL != socket->read_handle);
960   GNUNET_assert (NULL != socket->read_handle->proc);
961   /* Check the bitmap for any holes */
962   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
963   {
964     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
965                                            packet))
966       break;
967   }
968   /* We only call read processor if we have the first packet */
969   GNUNET_assert (0 < packet);
970   valid_read_size = 
971     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
972   GNUNET_assert (0 != valid_read_size);
973   /* Cancel the read_io_timeout_task */
974   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
975   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
976   /* Call the data processor */
977   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
978        GNUNET_i2s (&socket->other_peer));
979   read_size = 
980       socket->read_handle->proc (socket->read_handle->proc_cls,
981                                  socket->status,
982                                  socket->receive_buffer + socket->copy_offset,
983                                  valid_read_size);
984   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
985        GNUNET_i2s (&socket->other_peer), read_size);
986   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
987        GNUNET_i2s (&socket->other_peer));
988   /* Free the read handle */
989   GNUNET_free (socket->read_handle);
990   socket->read_handle = NULL;
991   GNUNET_assert (read_size <= valid_read_size);
992   socket->copy_offset += read_size;
993   /* Determine upto which packet we can remove from the buffer */
994   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
995   {
996     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
997     { 
998       packet++; 
999       break;
1000     }
1001     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1002       break;
1003   }
1004   /* If no packets can be removed we can't move the buffer */
1005   if (0 == packet) 
1006     return;
1007   sequence_increase = packet;
1008   LOG (GNUNET_ERROR_TYPE_DEBUG,
1009        "%s: Sequence increase after read processor completion: %u\n",
1010        GNUNET_i2s (&socket->other_peer), sequence_increase);
1011   /* Shift the data in the receive buffer */
1012   socket->receive_buffer = 
1013     memmove (socket->receive_buffer,
1014              socket->receive_buffer 
1015              + socket->receive_buffer_boundaries[sequence_increase-1],
1016              socket->receive_buffer_size
1017              - socket->receive_buffer_boundaries[sequence_increase-1]);
1018   /* Shift the bitmap */
1019   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1020   /* Set read_sequence_number */
1021   socket->read_sequence_number += sequence_increase;
1022   /* Set read_offset */
1023   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1024   socket->read_offset += offset_increase;
1025   /* Fix copy_offset */
1026   GNUNET_assert (offset_increase <= socket->copy_offset);
1027   socket->copy_offset -= offset_increase;
1028   /* Fix relative boundaries */
1029   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1030   {
1031     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1032     {
1033       uint32_t ahead_buffer_boundary;
1034
1035       ahead_buffer_boundary = 
1036         socket->receive_buffer_boundaries[packet + sequence_increase];
1037       if (0 == ahead_buffer_boundary)
1038         socket->receive_buffer_boundaries[packet] = 0;
1039       else
1040       {
1041         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1042         socket->receive_buffer_boundaries[packet] = 
1043           ahead_buffer_boundary - offset_increase;
1044       }
1045     }
1046     else
1047       socket->receive_buffer_boundaries[packet] = 0;
1048   }
1049 }
1050
1051
1052 /**
1053  * Cancels the existing read io handle
1054  *
1055  * @param cls the closure from the SCHEDULER call
1056  * @param tc the task context
1057  */
1058 static void
1059 read_io_timeout (void *cls,
1060                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1061 {
1062   struct GNUNET_STREAM_Socket *socket = cls;
1063   struct GNUNET_STREAM_IOReadHandle *read_handle;
1064   GNUNET_STREAM_DataProcessor proc;
1065   void *proc_cls;
1066
1067   read_handle = socket->read_handle;
1068   GNUNET_assert (NULL != read_handle);
1069   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1070   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1071     return;
1072   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1073   {
1074     LOG (GNUNET_ERROR_TYPE_DEBUG,
1075          "%s: Read task timedout - Cancelling it\n",
1076          GNUNET_i2s (&socket->other_peer));
1077     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1078     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1079   }
1080   proc = read_handle->proc;
1081   proc_cls = read_handle->proc_cls;
1082   GNUNET_free (read_handle);
1083   socket->read_handle = NULL;
1084   /* Call the read processor to signal timeout */
1085   proc (proc_cls,
1086         GNUNET_STREAM_TIMEOUT,
1087         NULL,
1088         0);
1089 }
1090
1091
1092 /**
1093  * Handler for DATA messages; Same for both client and server
1094  *
1095  * @param socket the socket through which the ack was received
1096  * @param tunnel connection to the other end
1097  * @param sender who sent the message
1098  * @param msg the data message
1099  * @param atsi performance data for the connection
1100  * @return GNUNET_OK to keep the connection open,
1101  *         GNUNET_SYSERR to close it (signal serious error)
1102  */
1103 static int
1104 handle_data (struct GNUNET_STREAM_Socket *socket,
1105              struct GNUNET_MESH_Tunnel *tunnel,
1106              const struct GNUNET_PeerIdentity *sender,
1107              const struct GNUNET_STREAM_DataMessage *msg,
1108              const struct GNUNET_ATS_Information*atsi)
1109 {
1110   const void *payload;
1111   struct GNUNET_TIME_Relative ack_deadline_rel;
1112   uint32_t bytes_needed;
1113   uint32_t relative_offset;
1114   uint32_t relative_sequence_number;
1115   uint16_t size;
1116
1117   size = htons (msg->header.header.size);
1118   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1119   {
1120     GNUNET_break_op (0);
1121     return GNUNET_SYSERR;
1122   }
1123   if (0 != memcmp (sender, &socket->other_peer,
1124                    sizeof (struct GNUNET_PeerIdentity)))
1125   {
1126     LOG (GNUNET_ERROR_TYPE_DEBUG,
1127          "%s: Received DATA from non-confirming peer\n",
1128          GNUNET_i2s (&socket->other_peer));
1129     return GNUNET_YES;
1130   }
1131   switch (socket->state)
1132   {
1133   case STATE_ESTABLISHED:
1134   case STATE_TRANSMIT_CLOSED:
1135   case STATE_TRANSMIT_CLOSE_WAIT:      
1136     /* check if the message's sequence number is in the range we are
1137        expecting */
1138     relative_sequence_number = 
1139       ntohl (msg->sequence_number) - socket->read_sequence_number;
1140     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1141     {
1142       LOG (GNUNET_ERROR_TYPE_DEBUG,
1143            "%s: Ignoring received message with sequence number %u\n",
1144            GNUNET_i2s (&socket->other_peer),
1145            ntohl (msg->sequence_number));
1146       /* Start ACK sending task if one is not already present */
1147       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1148       {
1149         socket->ack_task_id = 
1150           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1151                                         (msg->ack_deadline),
1152                                         &ack_task,
1153                                         socket);
1154       }
1155       return GNUNET_YES;
1156     }      
1157     /* Check if we have already seen this message */
1158     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1159                                             relative_sequence_number))
1160     {
1161       LOG (GNUNET_ERROR_TYPE_DEBUG,
1162            "%s: Ignoring already received message with sequence number %u\n",
1163            GNUNET_i2s (&socket->other_peer),
1164            ntohl (msg->sequence_number));
1165       /* Start ACK sending task if one is not already present */
1166       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1167       {
1168         socket->ack_task_id = 
1169           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1170                                         (msg->ack_deadline), &ack_task, socket);
1171       }
1172       return GNUNET_YES;
1173     }
1174     LOG (GNUNET_ERROR_TYPE_DEBUG,
1175          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1176          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1177          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1178     /* Check if we have to allocate the buffer */
1179     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1180     relative_offset = ntohl (msg->offset) - socket->read_offset;
1181     bytes_needed = relative_offset + size;
1182     if (bytes_needed > socket->receive_buffer_size)
1183     {
1184       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1185       {
1186         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1187                                                  bytes_needed);
1188         socket->receive_buffer_size = bytes_needed;
1189       }
1190       else
1191       {
1192         LOG (GNUNET_ERROR_TYPE_DEBUG,
1193              "%s: Cannot accommodate packet %d as buffer is full\n",
1194              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1195         return GNUNET_YES;
1196       }
1197     }
1198     /* Copy Data to buffer */
1199     payload = &msg[1];
1200     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1201     memcpy (socket->receive_buffer + relative_offset, payload, size);
1202     socket->receive_buffer_boundaries[relative_sequence_number] = 
1203         relative_offset + size;
1204     /* Modify the ACK bitmap */
1205     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1206                           GNUNET_YES);
1207     /* Start ACK sending task if one is not already present */
1208     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1209     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1210     {
1211       ack_deadline_rel = 
1212           GNUNET_TIME_relative_min (ack_deadline_rel,
1213                                     GNUNET_TIME_relative_multiply
1214                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1215       socket->ack_task_id = 
1216           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1217                                         (msg->ack_deadline), &ack_task, socket);
1218       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1219       socket->ack_time_deadline = ack_deadline_rel;
1220     }
1221     else
1222     {
1223       struct GNUNET_TIME_Relative ack_time_past;
1224       struct GNUNET_TIME_Relative ack_time_remaining;
1225       struct GNUNET_TIME_Relative ack_time_min;
1226       ack_time_past = 
1227           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1228       ack_time_remaining = GNUNET_TIME_relative_subtract
1229           (socket->ack_time_deadline, ack_time_past);
1230       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1231                                                ack_deadline_rel);
1232       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1233                       sizeof (struct GNUNET_TIME_Relative)))
1234       {
1235         ack_deadline_rel = ack_time_min;
1236         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1237         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1238                                                             &ack_task, socket);
1239         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1240         socket->ack_time_deadline = ack_deadline_rel;
1241       }
1242     }
1243     if ((NULL != socket->read_handle) /* A read handle is waiting */
1244         /* There is no current read task */
1245         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1246         /* We have the first packet */
1247         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1248     {
1249       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1250            GNUNET_i2s (&socket->other_peer));
1251       socket->read_handle->read_task_id =
1252           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1253     }
1254     break;
1255   default:
1256     LOG (GNUNET_ERROR_TYPE_DEBUG,
1257          "%s: Received data message when it cannot be handled\n",
1258          GNUNET_i2s (&socket->other_peer));
1259     break;
1260   }
1261   return GNUNET_YES;
1262 }
1263
1264
1265 /**
1266  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1267  *
1268  * @param cls the socket (set from GNUNET_MESH_connect)
1269  * @param tunnel connection to the other end
1270  * @param tunnel_ctx place to store local state associated with the tunnel
1271  * @param sender who sent the message
1272  * @param message the actual message
1273  * @param atsi performance data for the connection
1274  * @return GNUNET_OK to keep the connection open,
1275  *         GNUNET_SYSERR to close it (signal serious error)
1276  */
1277 static int
1278 client_handle_data (void *cls,
1279                     struct GNUNET_MESH_Tunnel *tunnel,
1280                     void **tunnel_ctx,
1281                     const struct GNUNET_PeerIdentity *sender,
1282                     const struct GNUNET_MessageHeader *message,
1283                     const struct GNUNET_ATS_Information*atsi)
1284 {
1285   struct GNUNET_STREAM_Socket *socket = cls;
1286
1287   return handle_data (socket, tunnel, sender, 
1288                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1289 }
1290
1291
1292 /**
1293  * Callback to set state to ESTABLISHED
1294  *
1295  * @param cls the closure NULL;
1296  * @param socket the socket to requiring state change
1297  */
1298 static void
1299 set_state_established (void *cls,
1300                        struct GNUNET_STREAM_Socket *socket)
1301 {
1302   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1303        "%s: Attaining ESTABLISHED state\n",
1304        GNUNET_i2s (&socket->other_peer));
1305   socket->write_offset = 0;
1306   socket->read_offset = 0;
1307   socket->state = STATE_ESTABLISHED;
1308   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1309                  socket->control_retransmission_task_id);
1310   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1311   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1312   if (NULL != socket->lsocket)
1313   {
1314     LOG (GNUNET_ERROR_TYPE_DEBUG,
1315          "%s: Calling listen callback\n",
1316          GNUNET_i2s (&socket->other_peer));
1317     if (GNUNET_SYSERR == 
1318         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1319                                     socket,
1320                                     &socket->other_peer))
1321     {
1322       socket->state = STATE_CLOSED;
1323       /* FIXME: We should close in a decent way (send RST) */
1324       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1325       GNUNET_free (socket);
1326     }
1327   }
1328   else
1329     socket->open_cb (socket->open_cls, socket);
1330 }
1331
1332
1333 /**
1334  * Callback to set state to HELLO_WAIT
1335  *
1336  * @param cls the closure from queue_message
1337  * @param socket the socket to requiring state change
1338  */
1339 static void
1340 set_state_hello_wait (void *cls,
1341                       struct GNUNET_STREAM_Socket *socket)
1342 {
1343   GNUNET_assert (STATE_INIT == socket->state);
1344   LOG (GNUNET_ERROR_TYPE_DEBUG,
1345        "%s: Attaining HELLO_WAIT state\n",
1346        GNUNET_i2s (&socket->other_peer));
1347   socket->state = STATE_HELLO_WAIT;
1348 }
1349
1350
1351 /**
1352  * Callback to set state to CLOSE_WAIT
1353  *
1354  * @param cls the closure from queue_message
1355  * @param socket the socket requiring state change
1356  */
1357 static void
1358 set_state_close_wait (void *cls,
1359                       struct GNUNET_STREAM_Socket *socket)
1360 {
1361   LOG (GNUNET_ERROR_TYPE_DEBUG,
1362        "%s: Attaing CLOSE_WAIT state\n",
1363        GNUNET_i2s (&socket->other_peer));
1364   socket->state = STATE_CLOSE_WAIT;
1365   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1366   socket->receive_buffer = NULL;
1367   socket->receive_buffer_size = 0;
1368 }
1369
1370
1371 /**
1372  * Callback to set state to RECEIVE_CLOSE_WAIT
1373  *
1374  * @param cls the closure from queue_message
1375  * @param socket the socket requiring state change
1376  */
1377 static void
1378 set_state_receive_close_wait (void *cls,
1379                               struct GNUNET_STREAM_Socket *socket)
1380 {
1381   LOG (GNUNET_ERROR_TYPE_DEBUG,
1382        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1383        GNUNET_i2s (&socket->other_peer));
1384   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1385   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1386   socket->receive_buffer = NULL;
1387   socket->receive_buffer_size = 0;
1388 }
1389
1390
1391 /**
1392  * Callback to set state to TRANSMIT_CLOSE_WAIT
1393  *
1394  * @param cls the closure from queue_message
1395  * @param socket the socket requiring state change
1396  */
1397 static void
1398 set_state_transmit_close_wait (void *cls,
1399                                struct GNUNET_STREAM_Socket *socket)
1400 {
1401   LOG (GNUNET_ERROR_TYPE_DEBUG,
1402        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1403        GNUNET_i2s (&socket->other_peer));
1404   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1405 }
1406
1407
1408 /**
1409  * Callback to set state to CLOSED
1410  *
1411  * @param cls the closure from queue_message
1412  * @param socket the socket requiring state change
1413  */
1414 static void
1415 set_state_closed (void *cls,
1416                   struct GNUNET_STREAM_Socket *socket)
1417 {
1418   socket->state = STATE_CLOSED;
1419 }
1420
1421
1422 /**
1423  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1424  *
1425  * @return the generate hello message
1426  */
1427 static struct GNUNET_STREAM_MessageHeader *
1428 generate_hello (void)
1429 {
1430   struct GNUNET_STREAM_MessageHeader *msg;
1431
1432   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1433   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1434   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1435   return msg;
1436 }
1437
1438
1439 /**
1440  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1441  * socket
1442  *
1443  * @param socket the socket for which this HelloAckMessage has to be generated
1444  * @param generate_seq GNUNET_YES to generate the write sequence number,
1445  *          GNUNET_NO to use the existing sequence number
1446  * @return the HelloAckMessage
1447  */
1448 static struct GNUNET_STREAM_HelloAckMessage *
1449 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1450                     int generate_seq)
1451 {
1452   struct GNUNET_STREAM_HelloAckMessage *msg;
1453
1454   if (GNUNET_YES == generate_seq)
1455   {
1456     if (GNUNET_YES == socket->testing_active)
1457       socket->write_sequence_number =
1458         socket->testing_set_write_sequence_number_value;
1459     else
1460       socket->write_sequence_number = 
1461         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1462     LOG_DEBUG ("%s: write sequence number %u\n",
1463                GNUNET_i2s (&socket->other_peer),
1464                (unsigned int) socket->write_sequence_number);
1465   }
1466   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1467   msg->header.header.size = 
1468     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1469   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1470   msg->sequence_number = htonl (socket->write_sequence_number);
1471   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1472   return msg;
1473 }
1474
1475
1476 /**
1477  * Task for retransmitting control messages if they aren't ACK'ed before a
1478  * deadline
1479  *
1480  * @param cls the socket
1481  * @param tc the Task context
1482  */
1483 static void
1484 control_retransmission_task (void *cls,
1485                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1486 {
1487   struct GNUNET_STREAM_Socket *socket = cls;
1488     
1489   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1490   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1491     return;
1492   LOG_DEBUG ("%s: Retransmitting a control message\n",
1493                  GNUNET_i2s (&socket->other_peer));
1494   switch (socket->state)
1495   {
1496   case STATE_INIT:    
1497     GNUNET_break (0);
1498     break;
1499   case STATE_LISTEN:
1500     GNUNET_break (0);
1501     break;
1502   case STATE_HELLO_WAIT:
1503     if (NULL == socket->lsocket) /* We are client */
1504       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1505     else
1506       queue_message (socket,
1507                      (struct GNUNET_STREAM_MessageHeader *)
1508                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1509                      GNUNET_NO);
1510     socket->control_retransmission_task_id =
1511     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1512                                   &control_retransmission_task, socket);
1513     break;
1514   case STATE_ESTABLISHED:
1515     if (NULL == socket->lsocket)
1516       queue_message (socket,
1517                      (struct GNUNET_STREAM_MessageHeader *)
1518                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1519                      GNUNET_NO);
1520     else
1521       GNUNET_break (0);
1522     break;
1523   default:
1524     GNUNET_break (0);
1525   }  
1526 }
1527
1528
1529 /**
1530  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1531  *
1532  * @param cls the socket (set from GNUNET_MESH_connect)
1533  * @param tunnel connection to the other end
1534  * @param tunnel_ctx this is NULL
1535  * @param sender who sent the message
1536  * @param message the actual message
1537  * @param atsi performance data for the connection
1538  * @return GNUNET_OK to keep the connection open,
1539  *         GNUNET_SYSERR to close it (signal serious error)
1540  */
1541 static int
1542 client_handle_hello_ack (void *cls,
1543                          struct GNUNET_MESH_Tunnel *tunnel,
1544                          void **tunnel_ctx,
1545                          const struct GNUNET_PeerIdentity *sender,
1546                          const struct GNUNET_MessageHeader *message,
1547                          const struct GNUNET_ATS_Information*atsi)
1548 {
1549   struct GNUNET_STREAM_Socket *socket = cls;
1550   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1551   struct GNUNET_STREAM_HelloAckMessage *reply;
1552
1553   if (0 != memcmp (sender, &socket->other_peer,
1554                    sizeof (struct GNUNET_PeerIdentity)))
1555   {
1556     LOG (GNUNET_ERROR_TYPE_DEBUG,
1557          "%s: Received HELLO_ACK from non-confirming peer\n",
1558          GNUNET_i2s (&socket->other_peer));
1559     return GNUNET_YES;
1560   }
1561   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1562   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1563        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1564   GNUNET_assert (socket->tunnel == tunnel);
1565   switch (socket->state)
1566   {
1567   case STATE_HELLO_WAIT:
1568     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1569     LOG (GNUNET_ERROR_TYPE_DEBUG,
1570          "%s: Read sequence number %u\n",
1571          GNUNET_i2s (&socket->other_peer),
1572          (unsigned int) socket->read_sequence_number);
1573     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1574     reply = generate_hello_ack (socket, GNUNET_YES);
1575     queue_message (socket, &reply->header, &set_state_established,
1576                    NULL, GNUNET_NO);    
1577     return GNUNET_OK;
1578   case STATE_ESTABLISHED:
1579     // call statistics (# ACKs ignored++)
1580     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1581                    socket->control_retransmission_task_id);
1582     socket->control_retransmission_task_id =
1583       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1584     return GNUNET_OK;
1585   default:
1586     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1587                GNUNET_i2s (&socket->other_peer),
1588                GNUNET_i2s (&socket->other_peer), socket->state);
1589     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1590     return GNUNET_SYSERR;
1591   }
1592 }
1593
1594
1595 /**
1596  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1597  *
1598  * @param cls the socket (set from GNUNET_MESH_connect)
1599  * @param tunnel connection to the other end
1600  * @param tunnel_ctx this is NULL
1601  * @param sender who sent the message
1602  * @param message the actual message
1603  * @param atsi performance data for the connection
1604  * @return GNUNET_OK to keep the connection open,
1605  *         GNUNET_SYSERR to close it (signal serious error)
1606  */
1607 static int
1608 client_handle_reset (void *cls,
1609                      struct GNUNET_MESH_Tunnel *tunnel,
1610                      void **tunnel_ctx,
1611                      const struct GNUNET_PeerIdentity *sender,
1612                      const struct GNUNET_MessageHeader *message,
1613                      const struct GNUNET_ATS_Information*atsi)
1614 {
1615   // struct GNUNET_STREAM_Socket *socket = cls;
1616
1617   return GNUNET_OK;
1618 }
1619
1620
1621 /**
1622  * Common message handler for handling TRANSMIT_CLOSE messages
1623  *
1624  * @param socket the socket through which the ack was received
1625  * @param tunnel connection to the other end
1626  * @param sender who sent the message
1627  * @param msg the transmit close message
1628  * @param atsi performance data for the connection
1629  * @return GNUNET_OK to keep the connection open,
1630  *         GNUNET_SYSERR to close it (signal serious error)
1631  */
1632 static int
1633 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1634                        struct GNUNET_MESH_Tunnel *tunnel,
1635                        const struct GNUNET_PeerIdentity *sender,
1636                        const struct GNUNET_STREAM_MessageHeader *msg,
1637                        const struct GNUNET_ATS_Information*atsi)
1638 {
1639   struct GNUNET_STREAM_MessageHeader *reply;
1640
1641   switch (socket->state)
1642   {
1643   case STATE_INIT:
1644   case STATE_LISTEN:
1645   case STATE_HELLO_WAIT:
1646     LOG (GNUNET_ERROR_TYPE_DEBUG,
1647          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1648          GNUNET_i2s (&socket->other_peer));
1649     return GNUNET_OK;
1650   default:
1651     break;
1652   }
1653   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1654        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1655   socket->receive_closed = GNUNET_YES;
1656   if (GNUNET_YES == socket->transmit_closed)
1657     socket->state = STATE_CLOSED;
1658   else
1659     socket->state = STATE_RECEIVE_CLOSED;
1660   /* Send TRANSMIT_CLOSE_ACK */
1661   reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1662   reply->header.type = 
1663       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1664   reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1665   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1666   return GNUNET_OK;
1667 }
1668
1669
1670 /**
1671  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1672  *
1673  * @param cls the socket (set from GNUNET_MESH_connect)
1674  * @param tunnel connection to the other end
1675  * @param tunnel_ctx this is NULL
1676  * @param sender who sent the message
1677  * @param message the actual message
1678  * @param atsi performance data for the connection
1679  * @return GNUNET_OK to keep the connection open,
1680  *         GNUNET_SYSERR to close it (signal serious error)
1681  */
1682 static int
1683 client_handle_transmit_close (void *cls,
1684                               struct GNUNET_MESH_Tunnel *tunnel,
1685                               void **tunnel_ctx,
1686                               const struct GNUNET_PeerIdentity *sender,
1687                               const struct GNUNET_MessageHeader *message,
1688                               const struct GNUNET_ATS_Information*atsi)
1689 {
1690   struct GNUNET_STREAM_Socket *socket = cls;
1691   
1692   return handle_transmit_close (socket,
1693                                 tunnel,
1694                                 sender,
1695                                 (struct GNUNET_STREAM_MessageHeader *)message,
1696                                 atsi);
1697 }
1698
1699
1700 /**
1701  * Task for calling the shutdown continuation callback
1702  *
1703  * @param cls the socket
1704  * @param tc the scheduler task context
1705  */
1706 static void
1707 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1708 {
1709   struct GNUNET_STREAM_Socket *socket = cls;
1710   
1711   GNUNET_assert (NULL != socket->shutdown_handle);
1712   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1713   if (NULL != socket->shutdown_handle->completion_cb)
1714     socket->shutdown_handle->completion_cb
1715         (socket->shutdown_handle->completion_cls,
1716          socket->shutdown_handle->operation);
1717   GNUNET_free (socket->shutdown_handle);
1718   socket->shutdown_handle = NULL;
1719 }
1720
1721
1722 /**
1723  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1724  *
1725  * @param socket the socket
1726  * @param tunnel connection to the other end
1727  * @param sender who sent the message
1728  * @param message the actual message
1729  * @param atsi performance data for the connection
1730  * @param operation the close operation which is being ACK'ed
1731  * @return GNUNET_OK to keep the connection open,
1732  *         GNUNET_SYSERR to close it (signal serious error)
1733  */
1734 static int
1735 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1736                           struct GNUNET_MESH_Tunnel *tunnel,
1737                           const struct GNUNET_PeerIdentity *sender,
1738                           const struct GNUNET_STREAM_MessageHeader *message,
1739                           const struct GNUNET_ATS_Information *atsi,
1740                           int operation)
1741 {
1742   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1743
1744   shutdown_handle = socket->shutdown_handle;
1745   if (NULL == shutdown_handle)
1746   {
1747     /* This may happen when the shudown handle is cancelled */
1748     LOG (GNUNET_ERROR_TYPE_DEBUG,
1749          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1750          GNUNET_i2s (&socket->other_peer));
1751     return GNUNET_OK;
1752   }
1753   switch (operation)
1754   {
1755   case SHUT_RDWR:
1756     switch (socket->state)
1757     {
1758     case STATE_CLOSE_WAIT:
1759       if (SHUT_RDWR != shutdown_handle->operation)
1760       {
1761         LOG (GNUNET_ERROR_TYPE_DEBUG,
1762              "%s: Received CLOSE_ACK when shutdown handle is not for "
1763              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1764         return GNUNET_OK;
1765       }
1766       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1767            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1768       socket->state = STATE_CLOSED;
1769       break;
1770     default:
1771       LOG (GNUNET_ERROR_TYPE_DEBUG,
1772            "%s: Received CLOSE_ACK when it is not expected\n",
1773            GNUNET_i2s (&socket->other_peer));
1774       return GNUNET_OK;
1775     }
1776     break;
1777   case SHUT_RD:
1778     switch (socket->state)
1779     {
1780     case STATE_RECEIVE_CLOSE_WAIT:
1781       if (SHUT_RD != shutdown_handle->operation)
1782       {
1783         LOG (GNUNET_ERROR_TYPE_DEBUG,
1784              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1785              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1786         return GNUNET_OK;
1787       }
1788       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1789            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1790       socket->state = STATE_RECEIVE_CLOSED;
1791       break;
1792     default:
1793       LOG (GNUNET_ERROR_TYPE_DEBUG,
1794            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1795            GNUNET_i2s (&socket->other_peer));
1796       return GNUNET_OK;
1797     }
1798     break;
1799   case SHUT_WR:
1800     switch (socket->state)
1801     {
1802     case STATE_TRANSMIT_CLOSE_WAIT:
1803       if (SHUT_WR != shutdown_handle->operation)
1804       {
1805         LOG (GNUNET_ERROR_TYPE_DEBUG,
1806              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1807              "is not for SHUT_WR\n",
1808              GNUNET_i2s (&socket->other_peer));
1809         return GNUNET_OK;
1810       }
1811       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1812            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1813       socket->state = STATE_TRANSMIT_CLOSED;
1814       break;
1815     default:
1816       LOG (GNUNET_ERROR_TYPE_DEBUG,
1817            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1818            GNUNET_i2s (&socket->other_peer));          
1819       return GNUNET_OK;
1820     }
1821     break;
1822   default:
1823     GNUNET_assert (0);
1824   }
1825   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1826       (&call_cont_task, socket);
1827   if (GNUNET_SCHEDULER_NO_TASK
1828       != shutdown_handle->close_msg_retransmission_task_id)
1829   {
1830     GNUNET_SCHEDULER_cancel
1831       (shutdown_handle->close_msg_retransmission_task_id);
1832     shutdown_handle->close_msg_retransmission_task_id =
1833         GNUNET_SCHEDULER_NO_TASK;
1834   }
1835   return GNUNET_OK;
1836 }
1837
1838
1839 /**
1840  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1841  *
1842  * @param cls the socket (set from GNUNET_MESH_connect)
1843  * @param tunnel connection to the other end
1844  * @param tunnel_ctx this is NULL
1845  * @param sender who sent the message
1846  * @param message the actual message
1847  * @param atsi performance data for the connection
1848  * @return GNUNET_OK to keep the connection open,
1849  *         GNUNET_SYSERR to close it (signal serious error)
1850  */
1851 static int
1852 client_handle_transmit_close_ack (void *cls,
1853                                   struct GNUNET_MESH_Tunnel *tunnel,
1854                                   void **tunnel_ctx,
1855                                   const struct GNUNET_PeerIdentity *sender,
1856                                   const struct GNUNET_MessageHeader *message,
1857                                   const struct GNUNET_ATS_Information*atsi)
1858 {
1859   struct GNUNET_STREAM_Socket *socket = cls;
1860
1861   return handle_generic_close_ack (socket,
1862                                    tunnel,
1863                                    sender,
1864                                    (const struct GNUNET_STREAM_MessageHeader *)
1865                                    message,
1866                                    atsi,
1867                                    SHUT_WR);
1868 }
1869
1870
1871 /**
1872  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1873  *
1874  * @param socket the socket
1875  * @param tunnel connection to the other end
1876  * @param sender who sent the message
1877  * @param message the actual message
1878  * @param atsi performance data for the connection
1879  * @return GNUNET_OK to keep the connection open,
1880  *         GNUNET_SYSERR to close it (signal serious error)
1881  */
1882 static int
1883 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1884                       struct GNUNET_MESH_Tunnel *tunnel,
1885                       const struct GNUNET_PeerIdentity *sender,
1886                       const struct GNUNET_STREAM_MessageHeader *message,
1887                       const struct GNUNET_ATS_Information *atsi)
1888 {
1889   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1890
1891   switch (socket->state)
1892   {
1893   case STATE_INIT:
1894   case STATE_LISTEN:
1895   case STATE_HELLO_WAIT:
1896     LOG (GNUNET_ERROR_TYPE_DEBUG,
1897          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1898          GNUNET_i2s (&socket->other_peer));
1899     return GNUNET_OK;
1900   default:
1901     break;
1902   }  
1903   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1904        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1905   socket->transmit_closed = GNUNET_YES;
1906   if (GNUNET_YES == socket->receive_closed)
1907     socket->state = STATE_CLOSED;
1908   else
1909     socket->state = STATE_TRANSMIT_CLOSED;
1910   receive_close_ack =
1911     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1912   receive_close_ack->header.size =
1913     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1914   receive_close_ack->header.type =
1915     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1916   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1917   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1918      that that stream has been shutdown */
1919   if (NULL != socket->write_handle)
1920   {
1921     if (NULL != socket->write_handle->write_cont)
1922       socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
1923                                         GNUNET_STREAM_SHUTDOWN, 0);
1924     GNUNET_STREAM_io_write_cancel (socket->write_handle);
1925     socket->write_handle = NULL;
1926   }
1927   return GNUNET_OK;
1928 }
1929
1930
1931 /**
1932  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1933  *
1934  * @param cls the socket (set from GNUNET_MESH_connect)
1935  * @param tunnel connection to the other end
1936  * @param tunnel_ctx this is NULL
1937  * @param sender who sent the message
1938  * @param message the actual message
1939  * @param atsi performance data for the connection
1940  * @return GNUNET_OK to keep the connection open,
1941  *         GNUNET_SYSERR to close it (signal serious error)
1942  */
1943 static int
1944 client_handle_receive_close (void *cls,
1945                              struct GNUNET_MESH_Tunnel *tunnel,
1946                              void **tunnel_ctx,
1947                              const struct GNUNET_PeerIdentity *sender,
1948                              const struct GNUNET_MessageHeader *message,
1949                              const struct GNUNET_ATS_Information*atsi)
1950 {
1951   struct GNUNET_STREAM_Socket *socket = cls;
1952
1953   return
1954     handle_receive_close (socket,
1955                           tunnel,
1956                           sender,
1957                           (const struct GNUNET_STREAM_MessageHeader *) message,
1958                           atsi);
1959 }
1960
1961
1962 /**
1963  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1964  *
1965  * @param cls the socket (set from GNUNET_MESH_connect)
1966  * @param tunnel connection to the other end
1967  * @param tunnel_ctx this is NULL
1968  * @param sender who sent the message
1969  * @param message the actual message
1970  * @param atsi performance data for the connection
1971  * @return GNUNET_OK to keep the connection open,
1972  *         GNUNET_SYSERR to close it (signal serious error)
1973  */
1974 static int
1975 client_handle_receive_close_ack (void *cls,
1976                                  struct GNUNET_MESH_Tunnel *tunnel,
1977                                  void **tunnel_ctx,
1978                                  const struct GNUNET_PeerIdentity *sender,
1979                                  const struct GNUNET_MessageHeader *message,
1980                                  const struct GNUNET_ATS_Information*atsi)
1981 {
1982   struct GNUNET_STREAM_Socket *socket = cls;
1983
1984   return handle_generic_close_ack (socket,
1985                                    tunnel,
1986                                    sender,
1987                                    (const struct GNUNET_STREAM_MessageHeader *)
1988                                    message,
1989                                    atsi,
1990                                    SHUT_RD);
1991 }
1992
1993
1994 /**
1995  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1996  *
1997  * @param socket the socket
1998  * @param tunnel connection to the other end
1999  * @param sender who sent the message
2000  * @param message the actual message
2001  * @param atsi performance data for the connection
2002  * @return GNUNET_OK to keep the connection open,
2003  *         GNUNET_SYSERR to close it (signal serious error)
2004  */
2005 static int
2006 handle_close (struct GNUNET_STREAM_Socket *socket,
2007               struct GNUNET_MESH_Tunnel *tunnel,
2008               const struct GNUNET_PeerIdentity *sender,
2009               const struct GNUNET_STREAM_MessageHeader *message,
2010               const struct GNUNET_ATS_Information*atsi)
2011 {
2012   struct GNUNET_STREAM_MessageHeader *close_ack;
2013
2014   switch (socket->state)
2015   {
2016   case STATE_INIT:
2017   case STATE_LISTEN:
2018   case STATE_HELLO_WAIT:
2019     LOG (GNUNET_ERROR_TYPE_DEBUG,
2020          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2021          GNUNET_i2s (&socket->other_peer));
2022     return GNUNET_OK;
2023   default:
2024     break;
2025   }
2026   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2027        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2028   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2029   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2030   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2031   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2032   if (STATE_CLOSED == socket->state)
2033     return GNUNET_OK;
2034   socket->receive_closed = GNUNET_YES;
2035   socket->transmit_closed = GNUNET_YES;
2036   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
2037   socket->receive_buffer = NULL;
2038   socket->receive_buffer_size = 0;  
2039   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
2040      that that stream has been shutdown */
2041   if (NULL != socket->write_handle)
2042   {
2043     if (NULL != socket->write_handle->write_cont)
2044       socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
2045                                         GNUNET_STREAM_SHUTDOWN, 0);
2046     GNUNET_STREAM_io_write_cancel (socket->write_handle);
2047     socket->write_handle = NULL;
2048   }
2049   return GNUNET_OK;
2050 }
2051
2052
2053 /**
2054  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2055  *
2056  * @param cls the socket (set from GNUNET_MESH_connect)
2057  * @param tunnel connection to the other end
2058  * @param tunnel_ctx this is NULL
2059  * @param sender who sent the message
2060  * @param message the actual message
2061  * @param atsi performance data for the connection
2062  * @return GNUNET_OK to keep the connection open,
2063  *         GNUNET_SYSERR to close it (signal serious error)
2064  */
2065 static int
2066 client_handle_close (void *cls,
2067                      struct GNUNET_MESH_Tunnel *tunnel,
2068                      void **tunnel_ctx,
2069                      const struct GNUNET_PeerIdentity *sender,
2070                      const struct GNUNET_MessageHeader *message,
2071                      const struct GNUNET_ATS_Information*atsi)
2072 {
2073   struct GNUNET_STREAM_Socket *socket = cls;
2074
2075   return handle_close (socket,
2076                        tunnel,
2077                        sender,
2078                        (const struct GNUNET_STREAM_MessageHeader *) message,
2079                        atsi);
2080 }
2081
2082
2083 /**
2084  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2085  *
2086  * @param cls the socket (set from GNUNET_MESH_connect)
2087  * @param tunnel connection to the other end
2088  * @param tunnel_ctx this is NULL
2089  * @param sender who sent the message
2090  * @param message the actual message
2091  * @param atsi performance data for the connection
2092  * @return GNUNET_OK to keep the connection open,
2093  *         GNUNET_SYSERR to close it (signal serious error)
2094  */
2095 static int
2096 client_handle_close_ack (void *cls,
2097                          struct GNUNET_MESH_Tunnel *tunnel,
2098                          void **tunnel_ctx,
2099                          const struct GNUNET_PeerIdentity *sender,
2100                          const struct GNUNET_MessageHeader *message,
2101                          const struct GNUNET_ATS_Information *atsi)
2102 {
2103   struct GNUNET_STREAM_Socket *socket = cls;
2104
2105   return handle_generic_close_ack (socket,
2106                                    tunnel,
2107                                    sender,
2108                                    (const struct GNUNET_STREAM_MessageHeader *) 
2109                                    message,
2110                                    atsi,
2111                                    SHUT_RDWR);
2112 }
2113
2114 /*****************************/
2115 /* Server's Message Handlers */
2116 /*****************************/
2117
2118 /**
2119  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2120  *
2121  * @param cls the closure
2122  * @param tunnel connection to the other end
2123  * @param tunnel_ctx the socket
2124  * @param sender who sent the message
2125  * @param message the actual message
2126  * @param atsi performance data for the connection
2127  * @return GNUNET_OK to keep the connection open,
2128  *         GNUNET_SYSERR to close it (signal serious error)
2129  */
2130 static int
2131 server_handle_data (void *cls,
2132                     struct GNUNET_MESH_Tunnel *tunnel,
2133                     void **tunnel_ctx,
2134                     const struct GNUNET_PeerIdentity *sender,
2135                     const struct GNUNET_MessageHeader *message,
2136                     const struct GNUNET_ATS_Information*atsi)
2137 {
2138   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2139
2140   return handle_data (socket,
2141                       tunnel,
2142                       sender,
2143                       (const struct GNUNET_STREAM_DataMessage *)message,
2144                       atsi);
2145 }
2146
2147
2148 /**
2149  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2150  *
2151  * @param cls the closure
2152  * @param tunnel connection to the other end
2153  * @param tunnel_ctx the socket
2154  * @param sender who sent the message
2155  * @param message the actual message
2156  * @param atsi performance data for the connection
2157  * @return GNUNET_OK to keep the connection open,
2158  *         GNUNET_SYSERR to close it (signal serious error)
2159  */
2160 static int
2161 server_handle_hello (void *cls,
2162                      struct GNUNET_MESH_Tunnel *tunnel,
2163                      void **tunnel_ctx,
2164                      const struct GNUNET_PeerIdentity *sender,
2165                      const struct GNUNET_MessageHeader *message,
2166                      const struct GNUNET_ATS_Information*atsi)
2167 {
2168   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2169   struct GNUNET_STREAM_HelloAckMessage *reply;
2170
2171   if (0 != memcmp (sender,
2172                    &socket->other_peer,
2173                    sizeof (struct GNUNET_PeerIdentity)))
2174   {
2175     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2176                GNUNET_i2s (&socket->other_peer));
2177     return GNUNET_YES;
2178   }
2179   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2180   GNUNET_assert (socket->tunnel == tunnel);
2181   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2182              GNUNET_i2s (&socket->other_peer));
2183   switch (socket->state)
2184   {
2185   case STATE_INIT:
2186     reply = generate_hello_ack (socket, GNUNET_YES);
2187     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2188                    GNUNET_NO);
2189     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2190                    socket->control_retransmission_task_id);
2191     socket->control_retransmission_task_id =
2192       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2193                                     &control_retransmission_task, socket);
2194     break;
2195   case STATE_HELLO_WAIT:
2196     /* Perhaps our HELLO_ACK was lost */
2197     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2198                    socket->control_retransmission_task_id);
2199     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2200     socket->control_retransmission_task_id =
2201       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2202     break;
2203   default:
2204     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2205                GNUNET_i2s (&socket->other_peer), socket->state);
2206     /* FIXME: Send RESET? */
2207   }
2208   return GNUNET_OK;
2209 }
2210
2211
2212 /**
2213  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2214  *
2215  * @param cls the closure
2216  * @param tunnel connection to the other end
2217  * @param tunnel_ctx the socket
2218  * @param sender who sent the message
2219  * @param message the actual message
2220  * @param atsi performance data for the connection
2221  * @return GNUNET_OK to keep the connection open,
2222  *         GNUNET_SYSERR to close it (signal serious error)
2223  */
2224 static int
2225 server_handle_hello_ack (void *cls,
2226                          struct GNUNET_MESH_Tunnel *tunnel,
2227                          void **tunnel_ctx,
2228                          const struct GNUNET_PeerIdentity *sender,
2229                          const struct GNUNET_MessageHeader *message,
2230                          const struct GNUNET_ATS_Information*atsi)
2231 {
2232   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2233   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2234
2235   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2236                  ntohs (message->type));
2237   GNUNET_assert (socket->tunnel == tunnel);
2238   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2239   switch (socket->state)  
2240   {
2241   case STATE_HELLO_WAIT:
2242     LOG (GNUNET_ERROR_TYPE_DEBUG,
2243          "%s: Received HELLO_ACK from %s\n",
2244          GNUNET_i2s (&socket->other_peer),
2245          GNUNET_i2s (&socket->other_peer));
2246     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2247     LOG (GNUNET_ERROR_TYPE_DEBUG,
2248          "%s: Read sequence number %u\n",
2249          GNUNET_i2s (&socket->other_peer),
2250          (unsigned int) socket->read_sequence_number);
2251     socket->receiver_window_available = 
2252       ntohl (ack_message->receiver_window_size);
2253     set_state_established (NULL, socket);
2254     break;
2255   default:
2256     LOG (GNUNET_ERROR_TYPE_DEBUG,
2257          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2258   }
2259   return GNUNET_OK;
2260 }
2261
2262
2263 /**
2264  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2265  *
2266  * @param cls the closure
2267  * @param tunnel connection to the other end
2268  * @param tunnel_ctx the socket
2269  * @param sender who sent the message
2270  * @param message the actual message
2271  * @param atsi performance data for the connection
2272  * @return GNUNET_OK to keep the connection open,
2273  *         GNUNET_SYSERR to close it (signal serious error)
2274  */
2275 static int
2276 server_handle_reset (void *cls,
2277                      struct GNUNET_MESH_Tunnel *tunnel,
2278                      void **tunnel_ctx,
2279                      const struct GNUNET_PeerIdentity *sender,
2280                      const struct GNUNET_MessageHeader *message,
2281                      const struct GNUNET_ATS_Information*atsi)
2282 {
2283   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2284
2285   return GNUNET_OK;
2286 }
2287
2288
2289 /**
2290  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2291  *
2292  * @param cls the closure
2293  * @param tunnel connection to the other end
2294  * @param tunnel_ctx the socket
2295  * @param sender who sent the message
2296  * @param message the actual message
2297  * @param atsi performance data for the connection
2298  * @return GNUNET_OK to keep the connection open,
2299  *         GNUNET_SYSERR to close it (signal serious error)
2300  */
2301 static int
2302 server_handle_transmit_close (void *cls,
2303                               struct GNUNET_MESH_Tunnel *tunnel,
2304                               void **tunnel_ctx,
2305                               const struct GNUNET_PeerIdentity *sender,
2306                               const struct GNUNET_MessageHeader *message,
2307                               const struct GNUNET_ATS_Information*atsi)
2308 {
2309   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2310
2311   return handle_transmit_close (socket,
2312                                 tunnel,
2313                                 sender,
2314                                 (struct GNUNET_STREAM_MessageHeader *)message,
2315                                 atsi);
2316 }
2317
2318
2319 /**
2320  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2321  *
2322  * @param cls the closure
2323  * @param tunnel connection to the other end
2324  * @param tunnel_ctx the socket
2325  * @param sender who sent the message
2326  * @param message the actual message
2327  * @param atsi performance data for the connection
2328  * @return GNUNET_OK to keep the connection open,
2329  *         GNUNET_SYSERR to close it (signal serious error)
2330  */
2331 static int
2332 server_handle_transmit_close_ack (void *cls,
2333                                   struct GNUNET_MESH_Tunnel *tunnel,
2334                                   void **tunnel_ctx,
2335                                   const struct GNUNET_PeerIdentity *sender,
2336                                   const struct GNUNET_MessageHeader *message,
2337                                   const struct GNUNET_ATS_Information*atsi)
2338 {
2339   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2340
2341   return handle_generic_close_ack (socket,
2342                                    tunnel,
2343                                    sender,
2344                                    (const struct GNUNET_STREAM_MessageHeader *)
2345                                    message,
2346                                    atsi,
2347                                    SHUT_WR);
2348 }
2349
2350
2351 /**
2352  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2353  *
2354  * @param cls the closure
2355  * @param tunnel connection to the other end
2356  * @param tunnel_ctx the socket
2357  * @param sender who sent the message
2358  * @param message the actual message
2359  * @param atsi performance data for the connection
2360  * @return GNUNET_OK to keep the connection open,
2361  *         GNUNET_SYSERR to close it (signal serious error)
2362  */
2363 static int
2364 server_handle_receive_close (void *cls,
2365                              struct GNUNET_MESH_Tunnel *tunnel,
2366                              void **tunnel_ctx,
2367                              const struct GNUNET_PeerIdentity *sender,
2368                              const struct GNUNET_MessageHeader *message,
2369                              const struct GNUNET_ATS_Information*atsi)
2370 {
2371   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2372
2373   return
2374     handle_receive_close (socket,
2375                           tunnel,
2376                           sender,
2377                           (const struct GNUNET_STREAM_MessageHeader *) message,
2378                           atsi);
2379 }
2380
2381
2382 /**
2383  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2384  *
2385  * @param cls the closure
2386  * @param tunnel connection to the other end
2387  * @param tunnel_ctx the socket
2388  * @param sender who sent the message
2389  * @param message the actual message
2390  * @param atsi performance data for the connection
2391  * @return GNUNET_OK to keep the connection open,
2392  *         GNUNET_SYSERR to close it (signal serious error)
2393  */
2394 static int
2395 server_handle_receive_close_ack (void *cls,
2396                                  struct GNUNET_MESH_Tunnel *tunnel,
2397                                  void **tunnel_ctx,
2398                                  const struct GNUNET_PeerIdentity *sender,
2399                                  const struct GNUNET_MessageHeader *message,
2400                                  const struct GNUNET_ATS_Information*atsi)
2401 {
2402   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2403
2404   return handle_generic_close_ack (socket,
2405                                    tunnel,
2406                                    sender,
2407                                    (const struct GNUNET_STREAM_MessageHeader *)
2408                                    message,
2409                                    atsi,
2410                                    SHUT_RD);
2411 }
2412
2413
2414 /**
2415  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2416  *
2417  * @param cls the listen socket (from GNUNET_MESH_connect in
2418  *          GNUNET_STREAM_listen) 
2419  * @param tunnel connection to the other end
2420  * @param tunnel_ctx the socket
2421  * @param sender who sent the message
2422  * @param message the actual message
2423  * @param atsi performance data for the connection
2424  * @return GNUNET_OK to keep the connection open,
2425  *         GNUNET_SYSERR to close it (signal serious error)
2426  */
2427 static int
2428 server_handle_close (void *cls,
2429                      struct GNUNET_MESH_Tunnel *tunnel,
2430                      void **tunnel_ctx,
2431                      const struct GNUNET_PeerIdentity *sender,
2432                      const struct GNUNET_MessageHeader *message,
2433                      const struct GNUNET_ATS_Information*atsi)
2434 {
2435   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2436   
2437   return handle_close (socket,
2438                        tunnel,
2439                        sender,
2440                        (const struct GNUNET_STREAM_MessageHeader *) message,
2441                        atsi);
2442 }
2443
2444
2445 /**
2446  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2447  *
2448  * @param cls the closure
2449  * @param tunnel connection to the other end
2450  * @param tunnel_ctx the socket
2451  * @param sender who sent the message
2452  * @param message the actual message
2453  * @param atsi performance data for the connection
2454  * @return GNUNET_OK to keep the connection open,
2455  *         GNUNET_SYSERR to close it (signal serious error)
2456  */
2457 static int
2458 server_handle_close_ack (void *cls,
2459                          struct GNUNET_MESH_Tunnel *tunnel,
2460                          void **tunnel_ctx,
2461                          const struct GNUNET_PeerIdentity *sender,
2462                          const struct GNUNET_MessageHeader *message,
2463                          const struct GNUNET_ATS_Information*atsi)
2464 {
2465   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2466
2467   return handle_generic_close_ack (socket,
2468                                    tunnel,
2469                                    sender,
2470                                    (const struct GNUNET_STREAM_MessageHeader *) 
2471                                    message,
2472                                    atsi,
2473                                    SHUT_RDWR);
2474 }
2475
2476
2477 /**
2478  * Handler for DATA_ACK messages
2479  *
2480  * @param socket the socket through which the ack was received
2481  * @param tunnel connection to the other end
2482  * @param sender who sent the message
2483  * @param ack the acknowledgment message
2484  * @param atsi performance data for the connection
2485  * @return GNUNET_OK to keep the connection open,
2486  *         GNUNET_SYSERR to close it (signal serious error)
2487  */
2488 static int
2489 handle_ack (struct GNUNET_STREAM_Socket *socket,
2490             struct GNUNET_MESH_Tunnel *tunnel,
2491             const struct GNUNET_PeerIdentity *sender,
2492             const struct GNUNET_STREAM_AckMessage *ack,
2493             const struct GNUNET_ATS_Information*atsi)
2494 {
2495   unsigned int packet;
2496   int need_retransmission;
2497   uint32_t sequence_difference;
2498   
2499   if (0 != memcmp (sender,
2500                    &socket->other_peer,
2501                    sizeof (struct GNUNET_PeerIdentity)))
2502   {
2503     LOG (GNUNET_ERROR_TYPE_DEBUG,
2504          "%s: Received ACK from non-confirming peer\n",
2505          GNUNET_i2s (&socket->other_peer));
2506     return GNUNET_YES;
2507   }
2508   switch (socket->state)
2509   {
2510   case (STATE_ESTABLISHED):
2511   case (STATE_RECEIVE_CLOSED):
2512   case (STATE_RECEIVE_CLOSE_WAIT):
2513     if (NULL == socket->write_handle)
2514     {
2515       LOG (GNUNET_ERROR_TYPE_DEBUG,
2516            "%s: Received DATA_ACK when write_handle is NULL\n",
2517            GNUNET_i2s (&socket->other_peer));
2518       return GNUNET_OK;
2519     }
2520     sequence_difference = 
2521         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2522     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2523     {
2524       LOG (GNUNET_ERROR_TYPE_DEBUG,
2525            "%s: Received DATA_ACK with unexpected base sequence number\n",
2526            GNUNET_i2s (&socket->other_peer));
2527       LOG (GNUNET_ERROR_TYPE_DEBUG,
2528            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2529            GNUNET_i2s (&socket->other_peer),
2530            socket->write_sequence_number,
2531            ntohl (ack->base_sequence_number));
2532       return GNUNET_OK;
2533     }
2534     /* FIXME: include the case when write_handle is cancelled - ignore the 
2535        acks */
2536     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2537          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2538     /* Cancel the retransmission task */
2539     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2540     {
2541       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2542       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2543     }
2544     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2545     {
2546       if (NULL == socket->write_handle->messages[packet]) break;
2547       /* BS: Base sequence from ack; PS: sequence num of current packet */
2548       sequence_difference = ntohl (ack->base_sequence_number)
2549         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2550       if ((0 == sequence_difference) ||
2551           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2552         continue; /* The message in our handle is not yet received */
2553       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2554       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2555       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2556                             packet, GNUNET_YES);
2557     }
2558     /* Update the receive window remaining
2559        FIXME : Should update with the value from a data ack with greater
2560        sequence number */
2561     socket->receiver_window_available = 
2562       ntohl (ack->receive_window_remaining);
2563     /* Check if we have received all acknowledgements */
2564     need_retransmission = GNUNET_NO;
2565     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2566     {
2567       if (NULL == socket->write_handle->messages[packet]) break;
2568       if (GNUNET_YES != ackbitmap_is_bit_set 
2569           (&socket->write_handle->ack_bitmap,packet))
2570       {
2571         need_retransmission = GNUNET_YES;
2572         break;
2573       }
2574     }
2575     if (GNUNET_YES == need_retransmission)
2576     {
2577       write_data (socket);
2578     }
2579     else      /* We have to call the write continuation callback now */
2580     {
2581       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2582       
2583       /* Free the packets */
2584       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2585       {
2586         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2587       }
2588       write_handle = socket->write_handle;
2589       socket->write_handle = NULL;
2590       if (NULL != write_handle->write_cont)
2591         write_handle->write_cont (write_handle->write_cont_cls,
2592                                   socket->status,
2593                                   write_handle->size);
2594       /* We are done with the write handle - Freeing it */
2595       GNUNET_free (write_handle);
2596       LOG (GNUNET_ERROR_TYPE_DEBUG,
2597            "%s: Write completion callback completed\n",
2598            GNUNET_i2s (&socket->other_peer));      
2599     }
2600     break;
2601   default:
2602     break;
2603   }
2604   return GNUNET_OK;
2605 }
2606
2607
2608 /**
2609  * Handler for DATA_ACK messages
2610  *
2611  * @param cls the 'struct GNUNET_STREAM_Socket'
2612  * @param tunnel connection to the other end
2613  * @param tunnel_ctx unused
2614  * @param sender who sent the message
2615  * @param message the actual message
2616  * @param atsi performance data for the connection
2617  * @return GNUNET_OK to keep the connection open,
2618  *         GNUNET_SYSERR to close it (signal serious error)
2619  */
2620 static int
2621 client_handle_ack (void *cls,
2622                    struct GNUNET_MESH_Tunnel *tunnel,
2623                    void **tunnel_ctx,
2624                    const struct GNUNET_PeerIdentity *sender,
2625                    const struct GNUNET_MessageHeader *message,
2626                    const struct GNUNET_ATS_Information*atsi)
2627 {
2628   struct GNUNET_STREAM_Socket *socket = cls;
2629   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2630  
2631   return handle_ack (socket, tunnel, sender, ack, atsi);
2632 }
2633
2634
2635 /**
2636  * Handler for DATA_ACK messages
2637  *
2638  * @param cls the server's listen socket
2639  * @param tunnel connection to the other end
2640  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2641  * @param sender who sent the message
2642  * @param message the actual message
2643  * @param atsi performance data for the connection
2644  * @return GNUNET_OK to keep the connection open,
2645  *         GNUNET_SYSERR to close it (signal serious error)
2646  */
2647 static int
2648 server_handle_ack (void *cls,
2649                    struct GNUNET_MESH_Tunnel *tunnel,
2650                    void **tunnel_ctx,
2651                    const struct GNUNET_PeerIdentity *sender,
2652                    const struct GNUNET_MessageHeader *message,
2653                    const struct GNUNET_ATS_Information*atsi)
2654 {
2655   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2656   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2657  
2658   return handle_ack (socket, tunnel, sender, ack, atsi);
2659 }
2660
2661
2662 /**
2663  * For client message handlers, the stream socket is in the
2664  * closure argument.
2665  */
2666 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2667   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2668   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2669    sizeof (struct GNUNET_STREAM_AckMessage) },
2670   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2671    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2672   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2673    sizeof (struct GNUNET_STREAM_MessageHeader)},
2674   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2675    sizeof (struct GNUNET_STREAM_MessageHeader)},
2676   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2677    sizeof (struct GNUNET_STREAM_MessageHeader)},
2678   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2679    sizeof (struct GNUNET_STREAM_MessageHeader)},
2680   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2681    sizeof (struct GNUNET_STREAM_MessageHeader)},
2682   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2683    sizeof (struct GNUNET_STREAM_MessageHeader)},
2684   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2685    sizeof (struct GNUNET_STREAM_MessageHeader)},
2686   {NULL, 0, 0}
2687 };
2688
2689
2690 /**
2691  * For server message handlers, the stream socket is in the
2692  * tunnel context, and the listen socket in the closure argument.
2693  */
2694 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2695   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2696   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2697    sizeof (struct GNUNET_STREAM_AckMessage) },
2698   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2699    sizeof (struct GNUNET_STREAM_MessageHeader)},
2700   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2701    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2702   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2703    sizeof (struct GNUNET_STREAM_MessageHeader)},
2704   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2705    sizeof (struct GNUNET_STREAM_MessageHeader)},
2706   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2707    sizeof (struct GNUNET_STREAM_MessageHeader)},
2708   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2709    sizeof (struct GNUNET_STREAM_MessageHeader)},
2710   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2711    sizeof (struct GNUNET_STREAM_MessageHeader)},
2712   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2713    sizeof (struct GNUNET_STREAM_MessageHeader)},
2714   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2715    sizeof (struct GNUNET_STREAM_MessageHeader)},
2716   {NULL, 0, 0}
2717 };
2718
2719
2720 /**
2721  * Function called when our target peer is connected to our tunnel
2722  *
2723  * @param cls the socket for which this tunnel is created
2724  * @param peer the peer identity of the target
2725  * @param atsi performance data for the connection
2726  */
2727 static void
2728 mesh_peer_connect_callback (void *cls,
2729                             const struct GNUNET_PeerIdentity *peer,
2730                             const struct GNUNET_ATS_Information * atsi)
2731 {
2732   struct GNUNET_STREAM_Socket *socket = cls;
2733   struct GNUNET_STREAM_MessageHeader *message;
2734   
2735   if (0 != memcmp (peer,
2736                    &socket->other_peer,
2737                    sizeof (struct GNUNET_PeerIdentity)))
2738   {
2739     LOG (GNUNET_ERROR_TYPE_DEBUG,
2740          "%s: A peer which is not our target has connected to our tunnel\n",
2741          GNUNET_i2s(peer));
2742     return;
2743   }
2744   LOG (GNUNET_ERROR_TYPE_DEBUG,
2745        "%s: Target peer %s connected\n",
2746        GNUNET_i2s (&socket->other_peer),
2747        GNUNET_i2s (&socket->other_peer));
2748   /* Set state to INIT */
2749   socket->state = STATE_INIT;
2750   /* Send HELLO message */
2751   message = generate_hello ();
2752   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2753   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2754                  socket->control_retransmission_task_id);
2755   socket->control_retransmission_task_id =
2756     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2757                                   &control_retransmission_task, socket);
2758 }
2759
2760
2761 /**
2762  * Function called when our target peer is disconnected from our tunnel
2763  *
2764  * @param cls the socket associated which this tunnel
2765  * @param peer the peer identity of the target
2766  */
2767 static void
2768 mesh_peer_disconnect_callback (void *cls,
2769                                const struct GNUNET_PeerIdentity *peer)
2770 {
2771   struct GNUNET_STREAM_Socket *socket=cls;
2772   
2773   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2774   LOG (GNUNET_ERROR_TYPE_DEBUG,
2775        "%s: Other peer %s disconnected \n",
2776        GNUNET_i2s (&socket->other_peer),
2777        GNUNET_i2s (&socket->other_peer));
2778 }
2779
2780
2781 /**
2782  * Method called whenever a peer creates a tunnel to us
2783  *
2784  * @param cls closure
2785  * @param tunnel new handle to the tunnel
2786  * @param initiator peer that started the tunnel
2787  * @param atsi performance information for the tunnel
2788  * @return initial tunnel context for the tunnel
2789  *         (can be NULL -- that's not an error)
2790  */
2791 static void *
2792 new_tunnel_notify (void *cls,
2793                    struct GNUNET_MESH_Tunnel *tunnel,
2794                    const struct GNUNET_PeerIdentity *initiator,
2795                    const struct GNUNET_ATS_Information *atsi)
2796 {
2797   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2798   struct GNUNET_STREAM_Socket *socket;
2799
2800   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2801      from the same peer again until the socket is closed */
2802
2803   if (GNUNET_NO == lsocket->listening)
2804   {
2805     GNUNET_MESH_tunnel_destroy (tunnel);
2806     return NULL;
2807   }
2808   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2809   socket->other_peer = *initiator;
2810   socket->tunnel = tunnel;
2811   socket->state = STATE_INIT;
2812   socket->lsocket = lsocket;
2813   socket->stat_handle = lsocket->stat_handle;
2814   socket->retransmit_timeout = lsocket->retransmit_timeout;
2815   socket->testing_active = lsocket->testing_active;
2816   socket->testing_set_write_sequence_number_value =
2817       lsocket->testing_set_write_sequence_number_value;
2818   socket->max_payload_size = lsocket->max_payload_size;
2819   LOG (GNUNET_ERROR_TYPE_DEBUG,
2820        "%s: Peer %s initiated tunnel to us\n", 
2821        GNUNET_i2s (&socket->other_peer),
2822        GNUNET_i2s (&socket->other_peer));
2823   if (NULL != socket->stat_handle)
2824   {
2825     GNUNET_STATISTICS_update (socket->stat_handle,
2826                               "total inbound connections received",
2827                               1, GNUNET_NO);
2828     GNUNET_STATISTICS_update (socket->stat_handle,
2829                               "inbound connections", 1, GNUNET_NO);
2830   }
2831   
2832   return socket;
2833 }
2834
2835
2836 /**
2837  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2838  * any associated state.  This function is NOT called if the client has
2839  * explicitly asked for the tunnel to be destroyed using
2840  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2841  * the tunnel.
2842  *
2843  * @param cls closure (set from GNUNET_MESH_connect)
2844  * @param tunnel connection to the other end (henceforth invalid)
2845  * @param tunnel_ctx place where local state associated
2846  *                   with the tunnel is stored
2847  */
2848 static void 
2849 tunnel_cleaner (void *cls,
2850                 const struct GNUNET_MESH_Tunnel *tunnel,
2851                 void *tunnel_ctx)
2852 {
2853   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2854   struct MessageQueue *head;
2855
2856   GNUNET_assert (tunnel == socket->tunnel);
2857   GNUNET_break_op(0);
2858   LOG (GNUNET_ERROR_TYPE_DEBUG,
2859        "%s: Peer %s has terminated connection abruptly\n",
2860        GNUNET_i2s (&socket->other_peer),
2861        GNUNET_i2s (&socket->other_peer));
2862   if (NULL != socket->stat_handle)
2863   {
2864     GNUNET_STATISTICS_update (socket->stat_handle,
2865                               "connections terminated abruptly", 1, GNUNET_NO);
2866     GNUNET_STATISTICS_update (socket->stat_handle,
2867                               "inbound connections", -1, GNUNET_NO);
2868   }
2869   socket->status = GNUNET_STREAM_SHUTDOWN;
2870   /* Clear Transmit handles */
2871   if (NULL != socket->transmit_handle)
2872   {
2873     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2874     socket->transmit_handle = NULL;
2875   }
2876   /* Stop Tasks using socket->tunnel */
2877   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2878   {
2879     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2880     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2881   }
2882   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2883   {
2884     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2885     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2886   }  
2887   /* Terminate the control retransmission tasks */
2888   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2889   {
2890     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2891     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2892   }
2893   /* Clear Transmit handles */
2894   if (NULL != socket->transmit_handle)
2895   {
2896     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2897     socket->transmit_handle = NULL;
2898   }
2899   /* Clear existing message queue */
2900   while (NULL != (head = socket->queue_head)) {
2901     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2902                                  socket->queue_tail,
2903                                  head);
2904     GNUNET_free (head->message);
2905     GNUNET_free (head);
2906   }
2907   socket->tunnel = NULL;
2908 }
2909
2910
2911 /**
2912  * Callback to signal timeout on lockmanager lock acquire
2913  *
2914  * @param cls the ListenSocket
2915  * @param tc the scheduler task context
2916  */
2917 static void
2918 lockmanager_acquire_timeout (void *cls, 
2919                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2920 {
2921   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2922   GNUNET_STREAM_ListenCallback listen_cb;
2923   void *listen_cb_cls;
2924
2925   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2926   listen_cb = lsocket->listen_cb;
2927   listen_cb_cls = lsocket->listen_cb_cls;
2928   if (NULL != listen_cb)
2929     listen_cb (listen_cb_cls, NULL, NULL);
2930 }
2931
2932
2933 /**
2934  * Callback to notify us on the status changes on app_port lock
2935  *
2936  * @param cls the ListenSocket
2937  * @param domain the domain name of the lock
2938  * @param lock the app_port
2939  * @param status the current status of the lock
2940  */
2941 static void
2942 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2943                        enum GNUNET_LOCKMANAGER_Status status)
2944 {
2945   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2946
2947   GNUNET_assert (lock == (uint32_t) lsocket->port);
2948   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2949   {
2950     lsocket->listening = GNUNET_YES;
2951     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2952     {
2953       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2954       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2955     }
2956     if (NULL == lsocket->mesh)
2957     {
2958       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2959
2960       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2961                                            lsocket, /* Closure */
2962                                            &new_tunnel_notify,
2963                                            &tunnel_cleaner,
2964                                            server_message_handlers,
2965                                            ports);
2966       GNUNET_assert (NULL != lsocket->mesh);
2967       if (NULL != lsocket->listen_ok_cb)
2968       {
2969         (void) lsocket->listen_ok_cb ();
2970       }
2971     }
2972   }
2973   if (GNUNET_LOCKMANAGER_RELEASE == status)
2974     lsocket->listening = GNUNET_NO;
2975 }
2976
2977
2978 /*****************/
2979 /* API functions */
2980 /*****************/
2981
2982
2983 /**
2984  * Tries to open a stream to the target peer
2985  *
2986  * @param cfg configuration to use
2987  * @param target the target peer to which the stream has to be opened
2988  * @param app_port the application port number which uniquely identifies this
2989  *            stream
2990  * @param open_cb this function will be called after stream has be established;
2991  *          cannot be NULL
2992  * @param open_cb_cls the closure for open_cb
2993  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2994  * @return if successful it returns the stream socket; NULL if stream cannot be
2995  *         opened 
2996  */
2997 struct GNUNET_STREAM_Socket *
2998 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2999                     const struct GNUNET_PeerIdentity *target,
3000                     GNUNET_MESH_ApplicationType app_port,
3001                     GNUNET_STREAM_OpenCallback open_cb,
3002                     void *open_cb_cls,
3003                     ...)
3004 {
3005   struct GNUNET_STREAM_Socket *socket;
3006   enum GNUNET_STREAM_Option option;
3007   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
3008   va_list vargs;
3009   uint16_t payload_size;
3010
3011   LOG (GNUNET_ERROR_TYPE_DEBUG,
3012        "%s\n", __func__);
3013   GNUNET_assert (NULL != open_cb);
3014   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3015   socket->other_peer = *target;
3016   socket->open_cb = open_cb;
3017   socket->open_cls = open_cb_cls;
3018   /* Set defaults */
3019   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3020   socket->testing_active = GNUNET_NO;
3021   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3022   va_start (vargs, open_cb_cls); /* Parse variable args */
3023   do {
3024     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3025     switch (option)
3026     {
3027     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3028       /* Expect struct GNUNET_TIME_Relative */
3029       socket->retransmit_timeout = va_arg (vargs,
3030                                            struct GNUNET_TIME_Relative);
3031       break;
3032     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3033       socket->testing_active = GNUNET_YES;
3034       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3035                                                                 uint32_t);
3036       break;
3037     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3038       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3039       break;
3040     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3041       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3042       break;
3043     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3044       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3045       GNUNET_assert (0 != payload_size);
3046       if (payload_size < socket->max_payload_size)
3047         socket->max_payload_size = payload_size;
3048       break;
3049     case GNUNET_STREAM_OPTION_END:
3050       break;
3051     }
3052   } while (GNUNET_STREAM_OPTION_END != option);
3053   va_end (vargs);               /* End of variable args parsing */
3054   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3055                                       socket, /* cls */
3056                                       NULL, /* No inbound tunnel handler */
3057                                       NULL, /* No in-tunnel cleaner */
3058                                       client_message_handlers,
3059                                       ports); /* We don't get inbound tunnels */
3060   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3061   {
3062     GNUNET_free (socket);
3063     return NULL;
3064   }
3065   /* Now create the mesh tunnel to target */
3066   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3067   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3068                                               NULL, /* Tunnel context */
3069                                               &mesh_peer_connect_callback,
3070                                               &mesh_peer_disconnect_callback,
3071                                               socket);
3072   GNUNET_assert (NULL != socket->tunnel);
3073   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3074                                         &socket->other_peer);
3075   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3076   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3077   return socket;
3078 }
3079
3080
3081 /**
3082  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3083  *
3084  * @param socket the stream socket
3085  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3086  * @param completion_cb the callback that will be called upon successful
3087  *          shutdown of given operation
3088  * @param completion_cls the closure for the completion callback
3089  * @return the shutdown handle
3090  */
3091 struct GNUNET_STREAM_ShutdownHandle *
3092 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3093                         int operation,
3094                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3095                         void *completion_cls)
3096 {
3097   struct GNUNET_STREAM_ShutdownHandle *handle;
3098   struct GNUNET_STREAM_MessageHeader *msg;
3099   
3100   GNUNET_assert (NULL == socket->shutdown_handle);
3101   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3102   handle->socket = socket;
3103   handle->completion_cb = completion_cb;
3104   handle->completion_cls = completion_cls;
3105   socket->shutdown_handle = handle;
3106   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3107        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3108        || ((GNUNET_YES == socket->transmit_closed) 
3109            && (GNUNET_YES == socket->receive_closed)
3110            && (SHUT_RDWR == operation)) )
3111   {
3112     handle->operation = operation;
3113     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3114                                                           socket);
3115     return handle;
3116   }
3117   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3118   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3119   switch (operation)
3120   {
3121   case SHUT_RD:
3122     handle->operation = SHUT_RD;
3123     if (NULL != socket->read_handle)
3124       LOG (GNUNET_ERROR_TYPE_WARNING,
3125            "Existing read handle should be cancelled before shutting"
3126            " down reading\n");
3127     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3128     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3129                    GNUNET_NO);
3130     socket->receive_closed = GNUNET_YES;
3131     break;
3132   case SHUT_WR:
3133     handle->operation = SHUT_WR;
3134     if (NULL != socket->write_handle)
3135       LOG (GNUNET_ERROR_TYPE_WARNING,
3136            "Existing write handle should be cancelled before shutting"
3137            " down writing\n");
3138     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3139     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3140                    GNUNET_NO);
3141     socket->transmit_closed = GNUNET_YES;
3142     break;
3143   case SHUT_RDWR:
3144     handle->operation = SHUT_RDWR;
3145     if (NULL != socket->write_handle)
3146       LOG (GNUNET_ERROR_TYPE_WARNING,
3147            "Existing write handle should be cancelled before shutting"
3148            " down writing\n");
3149     if (NULL != socket->read_handle)
3150       LOG (GNUNET_ERROR_TYPE_WARNING,
3151            "Existing read handle should be cancelled before shutting"
3152            " down reading\n");
3153     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3154     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3155     socket->transmit_closed = GNUNET_YES;
3156     socket->receive_closed = GNUNET_YES;
3157     break;
3158   default:
3159     LOG (GNUNET_ERROR_TYPE_WARNING,
3160          "GNUNET_STREAM_shutdown called with invalid value for "
3161          "parameter operation -- Ignoring\n");
3162     GNUNET_free (msg);
3163     GNUNET_free (handle);
3164     return NULL;
3165   }
3166   handle->close_msg_retransmission_task_id =
3167     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3168                                   &close_msg_retransmission_task,
3169                                   handle);
3170   return handle;
3171 }
3172
3173
3174 /**
3175  * Cancels a pending shutdown. Note that the shutdown messages may already be
3176  * sent and the stream is shutdown already for the operation given to
3177  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3178  * shutdown messages and frees the shutdown handle.
3179  *
3180  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3181  */
3182 void
3183 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3184 {
3185   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3186     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3187   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3188     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3189   handle->socket->shutdown_handle = NULL;
3190   GNUNET_free (handle);
3191 }
3192
3193
3194 /**
3195  * Closes the stream
3196  *
3197  * @param socket the stream socket
3198  */
3199 void
3200 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3201 {
3202   struct MessageQueue *head;
3203
3204   if (NULL != socket->read_handle)
3205   {
3206     LOG (GNUNET_ERROR_TYPE_WARNING,
3207          "Closing STREAM socket when a read handle is pending\n");
3208     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3209   }
3210   if (NULL != socket->write_handle)
3211   {
3212     LOG (GNUNET_ERROR_TYPE_WARNING,
3213          "Closing STREAM socket when a write handle is pending\n");
3214     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3215     //socket->write_handle = NULL;
3216   }
3217   GNUNET_break (GNUNET_YES == socket->receive_closed);
3218   GNUNET_break (GNUNET_YES == socket->transmit_closed);
3219   /* Terminate the ack'ing task if they are still present */
3220   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3221   {
3222     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3223     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3224   }
3225   /* Terminate the control retransmission tasks */
3226   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3227   {
3228     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3229   }
3230   /* Clear Transmit handles */
3231   if (NULL != socket->transmit_handle)
3232   {
3233     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3234     socket->transmit_handle = NULL;
3235   }
3236   /* Clear existing message queue */
3237   while (NULL != (head = socket->queue_head)) {
3238     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3239                                  socket->queue_tail,
3240                                  head);
3241     GNUNET_free (head->message);
3242     GNUNET_free (head);
3243   }
3244   /* Close associated tunnel */
3245   if (NULL != socket->tunnel)
3246   {
3247     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3248     socket->tunnel = NULL;
3249   }
3250   /* Close mesh connection */
3251   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3252   {
3253     GNUNET_MESH_disconnect (socket->mesh);
3254     socket->mesh = NULL;
3255   }
3256   /* Close statistics connection */
3257   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3258     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3259   /* Release receive buffer */
3260   if (NULL != socket->receive_buffer)
3261   {
3262     GNUNET_free (socket->receive_buffer);
3263   }
3264   GNUNET_free (socket);
3265 }
3266
3267
3268 /**
3269  * Listens for stream connections for a specific application ports
3270  *
3271  * @param cfg the configuration to use
3272  * @param app_port the application port for which new streams will be accepted
3273  * @param listen_cb this function will be called when a peer tries to establish
3274  *            a stream with us
3275  * @param listen_cb_cls closure for listen_cb
3276  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3277  * @return listen socket, NULL for any error
3278  */
3279 struct GNUNET_STREAM_ListenSocket *
3280 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3281                       GNUNET_MESH_ApplicationType app_port,
3282                       GNUNET_STREAM_ListenCallback listen_cb,
3283                       void *listen_cb_cls,
3284                       ...)
3285 {
3286   struct GNUNET_STREAM_ListenSocket *lsocket;
3287   struct GNUNET_TIME_Relative listen_timeout;
3288   enum GNUNET_STREAM_Option option;
3289   va_list vargs;
3290   uint16_t payload_size;
3291
3292   GNUNET_assert (NULL != listen_cb);
3293   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3294   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3295   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3296   if (NULL == lsocket->lockmanager)
3297   {
3298     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3299     GNUNET_free (lsocket);
3300     return NULL;
3301   }
3302   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3303   /* Set defaults */
3304   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3305   lsocket->testing_active = GNUNET_NO;
3306   lsocket->listen_ok_cb = NULL;
3307   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3308   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3309   va_start (vargs, listen_cb_cls);
3310   do {
3311     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3312     switch (option)
3313     {
3314     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3315       lsocket->retransmit_timeout = va_arg (vargs,
3316                                             struct GNUNET_TIME_Relative);
3317       break;
3318     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3319       lsocket->testing_active = GNUNET_YES;
3320       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3321                                                                  uint32_t);
3322       break;
3323     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3324       listen_timeout = GNUNET_TIME_relative_multiply
3325         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3326       break;
3327     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3328       lsocket->listen_ok_cb = va_arg (vargs,
3329                                       GNUNET_STREAM_ListenSuccessCallback);
3330       break;
3331     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3332       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3333       GNUNET_assert (0 != payload_size);
3334       if (payload_size < lsocket->max_payload_size)
3335         lsocket->max_payload_size = payload_size;
3336       break;
3337     case GNUNET_STREAM_OPTION_END:
3338       break;
3339     }
3340   } while (GNUNET_STREAM_OPTION_END != option);
3341   va_end (vargs);
3342   lsocket->port = app_port;
3343   lsocket->listen_cb = listen_cb;
3344   lsocket->listen_cb_cls = listen_cb_cls;
3345   lsocket->locking_request = 
3346     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3347                                      (uint32_t) lsocket->port,
3348                                      &lock_status_change_cb, lsocket);
3349   lsocket->lockmanager_acquire_timeout_task =
3350     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3351                                   &lockmanager_acquire_timeout, lsocket);
3352   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3353                                                    lsocket->cfg);
3354   return lsocket;
3355 }
3356
3357
3358 /**
3359  * Closes the listen socket
3360  *
3361  * @param lsocket the listen socket
3362  */
3363 void
3364 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3365 {
3366   /* Close MESH connection */
3367   if (NULL != lsocket->mesh)
3368     GNUNET_MESH_disconnect (lsocket->mesh);
3369   if (NULL != lsocket->stat_handle)
3370     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3371   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3372   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3373     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3374   if (NULL != lsocket->locking_request)
3375     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3376   if (NULL != lsocket->lockmanager)
3377     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3378   GNUNET_free (lsocket);
3379 }
3380
3381
3382 /**
3383  * Tries to write the given data to the stream. The maximum size of data that
3384  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3385  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3386  * violation, however only the said number of maximum bytes will be written.
3387  *
3388  * @param socket the socket representing a stream
3389  * @param data the data buffer from where the data is written into the stream
3390  * @param size the number of bytes to be written from the data buffer
3391  * @param timeout the timeout period
3392  * @param write_cont the function to call upon writing some bytes into the
3393  *          stream 
3394  * @param write_cont_cls the closure
3395  *
3396  * @return handle to cancel the operation; if a previous write is pending or
3397  *           the stream has been shutdown for this operation then write_cont is
3398  *           immediately called and NULL is returned.
3399  */
3400 struct GNUNET_STREAM_IOWriteHandle *
3401 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3402                      const void *data,
3403                      size_t size,
3404                      struct GNUNET_TIME_Relative timeout,
3405                      GNUNET_STREAM_CompletionContinuation write_cont,
3406                      void *write_cont_cls)
3407 {
3408   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3409   struct GNUNET_STREAM_DataMessage *data_msg;
3410   const void *sweep;
3411   struct GNUNET_TIME_Relative ack_deadline;
3412   unsigned int num_needed_packets;
3413   unsigned int packet;
3414   uint32_t packet_size;
3415   uint32_t payload_size;
3416   uint16_t max_data_packet_size;
3417
3418   LOG (GNUNET_ERROR_TYPE_DEBUG,
3419        "%s\n", __func__);
3420   if (NULL != socket->write_handle)
3421   {
3422     GNUNET_break (0);
3423     return NULL;
3424   }
3425   switch (socket->state)
3426   {
3427   case STATE_TRANSMIT_CLOSED:
3428   case STATE_TRANSMIT_CLOSE_WAIT:
3429   case STATE_CLOSED:
3430   case STATE_CLOSE_WAIT:
3431     if (NULL != write_cont)
3432       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3433     LOG (GNUNET_ERROR_TYPE_DEBUG,
3434          "%s() END\n", __func__);
3435     return NULL;
3436   case STATE_INIT:
3437   case STATE_LISTEN:
3438   case STATE_HELLO_WAIT:
3439     if (NULL != write_cont)
3440       /* FIXME: GNUNET_STREAM_SYSERR?? */
3441       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3442     LOG (GNUNET_ERROR_TYPE_DEBUG,
3443          "%s() END\n", __func__);
3444     return NULL;
3445   case STATE_ESTABLISHED:
3446   case STATE_RECEIVE_CLOSED:
3447   case STATE_RECEIVE_CLOSE_WAIT:
3448     break;
3449   }
3450   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3451     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3452   num_needed_packets =
3453       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3454   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3455   io_handle->socket = socket;
3456   io_handle->write_cont = write_cont;
3457   io_handle->write_cont_cls = write_cont_cls;
3458   io_handle->size = size;
3459   io_handle->packets_sent = 0;
3460   sweep = data;
3461   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3462      determined from RTT */
3463   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3464   /* Divide the given buffer into packets for sending */
3465   max_data_packet_size =
3466       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3467   for (packet=0; packet < num_needed_packets; packet++)
3468   {
3469     if ((packet + 1) * socket->max_payload_size < size) 
3470     {
3471       payload_size = socket->max_payload_size;
3472       packet_size = max_data_packet_size;
3473     }
3474     else 
3475     {
3476       payload_size = size - packet * socket->max_payload_size;
3477       packet_size = 
3478           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3479     }
3480     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3481     io_handle->messages[packet]->header.header.size = htons (packet_size);
3482     io_handle->messages[packet]->header.header.type =
3483       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3484     io_handle->messages[packet]->sequence_number =
3485       htonl (socket->write_sequence_number++);
3486     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3487     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3488        determined from RTT */
3489     io_handle->messages[packet]->ack_deadline =
3490       GNUNET_TIME_relative_hton (ack_deadline);
3491     data_msg = io_handle->messages[packet];
3492     /* Copy data from given buffer to the packet */
3493     memcpy (&data_msg[1], sweep, payload_size);
3494     sweep += payload_size;
3495     socket->write_offset += payload_size;
3496   }
3497   /* ack the last data message. FIXME: remove when we figure out how to do this
3498      using RTT */
3499   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3500       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3501   socket->write_handle = io_handle;
3502   write_data (socket);
3503   LOG (GNUNET_ERROR_TYPE_DEBUG,
3504        "%s() END\n", __func__);
3505   return io_handle;
3506 }
3507
3508
3509 /**
3510  * Tries to read data from the stream.
3511  *
3512  * @param socket the socket representing a stream
3513  * @param timeout the timeout period
3514  * @param proc function to call with data (once only)
3515  * @param proc_cls the closure for proc
3516  *
3517  * @return handle to cancel the operation; NULL is returned if: the stream has
3518  *           been shutdown for this type of opeartion (the DataProcessor is
3519  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3520  *           read handle is present (only one read handle per socket is present
3521  *           at any time)
3522  */
3523 struct GNUNET_STREAM_IOReadHandle *
3524 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3525                     struct GNUNET_TIME_Relative timeout,
3526                     GNUNET_STREAM_DataProcessor proc,
3527                     void *proc_cls)
3528 {
3529   struct GNUNET_STREAM_IOReadHandle *read_handle;
3530   
3531   LOG (GNUNET_ERROR_TYPE_DEBUG,
3532        "%s: %s()\n", 
3533        GNUNET_i2s (&socket->other_peer),
3534        __func__);
3535   /* Return NULL if there is already a read handle; the user has to cancel that
3536      first before continuing or has to wait until it is completed */
3537   if (NULL != socket->read_handle) 
3538     return NULL;
3539   GNUNET_assert (NULL != proc);
3540   switch (socket->state)
3541   {
3542   case STATE_RECEIVE_CLOSED:
3543   case STATE_RECEIVE_CLOSE_WAIT:
3544   case STATE_CLOSED:
3545   case STATE_CLOSE_WAIT:
3546     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3547     LOG (GNUNET_ERROR_TYPE_DEBUG,
3548          "%s: %s() END\n",
3549          GNUNET_i2s (&socket->other_peer),
3550          __func__);
3551     return NULL;
3552   default:
3553     break;
3554   }
3555   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3556   read_handle->proc = proc;
3557   read_handle->proc_cls = proc_cls;
3558   read_handle->socket = socket;
3559   socket->read_handle = read_handle;
3560   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3561                                           0))
3562     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3563                                                           socket);   
3564   read_handle->read_io_timeout_task_id =
3565       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3566   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3567        GNUNET_i2s (&socket->other_peer), __func__);
3568   return read_handle;
3569 }
3570
3571
3572 /**
3573  * Cancel pending write operation.
3574  *
3575  * @param ioh handle to operation to cancel
3576  */
3577 void
3578 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3579 {
3580   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3581   unsigned int packet;
3582
3583   GNUNET_assert (NULL != socket->write_handle);
3584   GNUNET_assert (socket->write_handle == ioh);
3585
3586   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3587   {
3588     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3589     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3590   }
3591
3592   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3593   {
3594     if (NULL == ioh->messages[packet]) break;
3595     GNUNET_free (ioh->messages[packet]);
3596   }
3597       
3598   GNUNET_free (socket->write_handle);
3599   socket->write_handle = NULL;
3600 }
3601
3602
3603 /**
3604  * Cancel pending read operation.
3605  *
3606  * @param ioh handle to operation to cancel
3607  */
3608 void
3609 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3610 {
3611   struct GNUNET_STREAM_Socket *socket;
3612   
3613   socket = ioh->socket;
3614   GNUNET_assert (NULL != socket->read_handle);
3615   GNUNET_assert (ioh == socket->read_handle);
3616   /* Read io time task should be there; if it is already executed then this
3617   read handle is not valid; However upon scheduler shutdown the read io task
3618   may be executed before */
3619   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3620     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3621   /* reading task may be present; if so we have to stop it */
3622   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3623     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3624   GNUNET_free (ioh);
3625   socket->read_handle = NULL;
3626 }
3627
3628 /* end of stream_api.c */