use statistics
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_statistics_service.h"
43 #include "gnunet_stream_lib.h"
44 #include "stream_protocol.h"
45
46 /**
47  * Generic logging shorthand
48  */
49 #define LOG(kind,...)                                   \
50   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
51
52 /**
53  * Debug logging shorthand
54  */
55 #define LOG_DEBUG(...)                          \
56   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
57
58 /**
59  * Time in relative seconds shorthand
60  */
61 #define TIME_REL_SECS(sec) \
62   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
63
64 /**
65  * The maximum packet size of a stream packet
66  */
67 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
68
69 /**
70  * Receive buffer
71  */
72 #define RECEIVE_BUFFER_SIZE 4096000
73
74 /**
75  * states in the Protocol
76  */
77 enum State
78   {
79     /**
80      * Client initialization state
81      */
82     STATE_INIT,
83
84     /**
85      * Listener initialization state 
86      */
87     STATE_LISTEN,
88
89     /**
90      * Pre-connection establishment state
91      */
92     STATE_HELLO_WAIT,
93
94     /**
95      * State where a connection has been established
96      */
97     STATE_ESTABLISHED,
98
99     /**
100      * State where the socket is closed on our side and waiting to be ACK'ed
101      */
102     STATE_RECEIVE_CLOSE_WAIT,
103
104     /**
105      * State where the socket is closed for reading
106      */
107     STATE_RECEIVE_CLOSED,
108
109     /**
110      * State where the socket is closed on our side and waiting to be ACK'ed
111      */
112     STATE_TRANSMIT_CLOSE_WAIT,
113
114     /**
115      * State where the socket is closed for writing
116      */
117     STATE_TRANSMIT_CLOSED,
118
119     /**
120      * State where the socket is closed on our side and waiting to be ACK'ed
121      */
122     STATE_CLOSE_WAIT,
123
124     /**
125      * State where the socket is closed
126      */
127     STATE_CLOSED 
128   };
129
130
131 /**
132  * Functions of this type are called when a message is written
133  *
134  * @param cls the closure from queue_message
135  * @param socket the socket the written message was bound to
136  */
137 typedef void (*SendFinishCallback) (void *cls,
138                                     struct GNUNET_STREAM_Socket *socket);
139
140
141 /**
142  * The send message queue
143  */
144 struct MessageQueue
145 {
146   /**
147    * The message
148    */
149   struct GNUNET_STREAM_MessageHeader *message;
150
151   /**
152    * Callback to be called when the message is sent
153    */
154   SendFinishCallback finish_cb;
155
156   /**
157    * The closure for finish_cb
158    */
159   void *finish_cb_cls;
160
161   /**
162    * The next message in queue. Should be NULL in the last message
163    */
164   struct MessageQueue *next;
165
166   /**
167    * The next message in queue. Should be NULL in the first message
168    */
169   struct MessageQueue *prev;
170 };
171
172
173 /**
174  * The STREAM Socket Handler
175  */
176 struct GNUNET_STREAM_Socket
177 {
178   /**
179    * The mesh handle
180    */
181   struct GNUNET_MESH_Handle *mesh;
182
183   /**
184    * Handle to statistics
185    */
186   struct GNUNET_STATISTICS_Handle *stat_handle;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * The shutdown handle associated with this socket
230    */
231   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
232
233   /**
234    * Buffer for storing received messages
235    */
236   void *receive_buffer;
237
238   /**
239    * The listen socket from which this socket is derived. Should be NULL if it
240    * is not a derived socket
241    */
242   struct GNUNET_STREAM_ListenSocket *lsocket;
243
244   /**
245    * The peer identity of the peer at the other end of the stream
246    */
247   struct GNUNET_PeerIdentity other_peer;
248
249   /**
250    * The Acknowledgement Bitmap
251    */
252   GNUNET_STREAM_AckBitmap ack_bitmap;
253
254   /**
255    * Task identifier for retransmission task after timeout
256    */
257   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
258
259   /**
260    * Task identifier for retransmission of control messages
261    */
262   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
263
264   /**
265    * The task for sending timely Acks
266    */
267   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
268
269   /**
270    * Retransmission timeout
271    */
272   struct GNUNET_TIME_Relative retransmit_timeout;
273
274   /**
275    * Time when the Acknowledgement was queued
276    */
277   struct GNUNET_TIME_Absolute ack_time_registered;
278
279   /**
280    * Queued Acknowledgement deadline
281    */
282   struct GNUNET_TIME_Relative ack_time_deadline;
283
284   /**
285    * The state of the protocol associated with this socket
286    */
287   enum State state;
288
289   /**
290    * The status of the socket
291    */
292   enum GNUNET_STREAM_Status status;
293
294   /**
295    * The number of previous timeouts; FIXME: currently not used
296    */
297   unsigned int retries;
298
299   /**
300    * Whether testing mode is active or not
301    */
302   int testing_active;
303
304   /**
305    * The application port number (type: uint32_t)
306    */
307   GNUNET_MESH_ApplicationType app_port;
308
309   /**
310    * The write sequence number to be set incase of testing
311    */
312   uint32_t testing_set_write_sequence_number_value;
313
314   /**
315    * Write sequence number. Set to random when sending HELLO(client) and
316    * HELLO_ACK(server) 
317    */
318   uint32_t write_sequence_number;
319
320   /**
321    * Read sequence number. This number's value is determined during handshake
322    */
323   uint32_t read_sequence_number;
324
325   /**
326    * The receiver buffer size
327    */
328   uint32_t receive_buffer_size;
329
330   /**
331    * The receiver buffer boundaries
332    */
333   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
334
335   /**
336    * receiver's available buffer after the last acknowledged packet
337    */
338   uint32_t receiver_window_available;
339
340   /**
341    * The offset pointer used during write operation
342    */
343   uint32_t write_offset;
344
345   /**
346    * The offset after which we are expecting data
347    */
348   uint32_t read_offset;
349
350   /**
351    * The offset upto which user has read from the received buffer
352    */
353   uint32_t copy_offset;
354
355   /**
356    * The maximum size of the data message payload this stream handle can send
357    */
358   uint16_t max_payload_size;
359 };
360
361
362 /**
363  * A socket for listening
364  */
365 struct GNUNET_STREAM_ListenSocket
366 {
367   /**
368    * The mesh handle
369    */
370   struct GNUNET_MESH_Handle *mesh;
371
372   /**
373    * Handle to statistics
374    */
375   struct GNUNET_STATISTICS_Handle *stat_handle;
376
377   /**
378    * Our configuration
379    */
380   struct GNUNET_CONFIGURATION_Handle *cfg;
381
382   /**
383    * Handle to the lock manager service
384    */
385   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
386
387   /**
388    * The active LockingRequest from lockmanager
389    */
390   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
391
392   /**
393    * Callback to call after acquring a lock and listening
394    */
395   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
396
397   /**
398    * The callback function which is called after successful opening socket
399    */
400   GNUNET_STREAM_ListenCallback listen_cb;
401
402   /**
403    * The call back closure
404    */
405   void *listen_cb_cls;
406
407   /**
408    * The service port
409    */
410   GNUNET_MESH_ApplicationType port;
411   
412   /**
413    * The id of the lockmanager timeout task
414    */
415   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
416
417   /**
418    * The retransmit timeout
419    */
420   struct GNUNET_TIME_Relative retransmit_timeout;
421   
422   /**
423    * Listen enabled?
424    */
425   int listening;
426
427   /**
428    * Whether testing mode is active or not
429    */
430   int testing_active;
431
432   /**
433    * The write sequence number to be set incase of testing
434    */
435   uint32_t testing_set_write_sequence_number_value;
436
437   /**
438    * The maximum size of the data message payload this stream handle can send
439    */
440   uint16_t max_payload_size;
441
442 };
443
444
445 /**
446  * The IO Write Handle
447  */
448 struct GNUNET_STREAM_IOWriteHandle
449 {
450   /**
451    * The socket to which this write handle is associated
452    */
453   struct GNUNET_STREAM_Socket *socket;
454
455   /**
456    * The packet_buffers associated with this Handle
457    */
458   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
459
460   /**
461    * The write continuation callback
462    */
463   GNUNET_STREAM_CompletionContinuation write_cont;
464
465   /**
466    * Write continuation closure
467    */
468   void *write_cont_cls;
469
470   /**
471    * The bitmap of this IOHandle; Corresponding bit for a message is set when
472    * it has been acknowledged by the receiver
473    */
474   GNUNET_STREAM_AckBitmap ack_bitmap;
475
476   /**
477    * Number of bytes in this write handle
478    */
479   size_t size;
480
481   /**
482    * Number of packets already transmitted from this IO handle. Retransmitted
483    * packets are not taken into account here. This is used to determine which
484    * packets account for retransmission and which packets occupy buffer space at
485    * the receiver.
486    */
487   unsigned int packets_sent;
488 };
489
490
491 /**
492  * The IO Read Handle
493  */
494 struct GNUNET_STREAM_IOReadHandle
495 {
496   /**
497    * The socket to which this read handle is associated
498    */
499   struct GNUNET_STREAM_Socket *socket;
500   
501   /**
502    * Callback for the read processor
503    */
504   GNUNET_STREAM_DataProcessor proc;
505
506   /**
507    * The closure pointer for the read processor callback
508    */
509   void *proc_cls;
510
511   /**
512    * Task identifier for the read io timeout task
513    */
514   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
515
516   /**
517    * Task scheduled to continue a read operation.
518    */
519   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
520 };
521
522
523 /**
524  * Handle for Shutdown
525  */
526 struct GNUNET_STREAM_ShutdownHandle
527 {
528   /**
529    * The socket associated with this shutdown handle
530    */
531   struct GNUNET_STREAM_Socket *socket;
532
533   /**
534    * Shutdown completion callback
535    */
536   GNUNET_STREAM_ShutdownCompletion completion_cb;
537
538   /**
539    * Closure for completion callback
540    */
541   void *completion_cls;
542
543   /**
544    * Close message retransmission task id
545    */
546   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
547
548   /**
549    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
550    */
551   int operation;  
552 };
553
554
555 /**
556  * Default value in seconds for various timeouts
557  */
558 static const unsigned int default_timeout = 10;
559
560 /**
561  * The domain name for locks we use here
562  */
563 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
564
565
566 /**
567  * Callback function for sending queued message
568  *
569  * @param cls closure the socket
570  * @param size number of bytes available in buf
571  * @param buf where the callee should write the message
572  * @return number of bytes written to buf
573  */
574 static size_t
575 send_message_notify (void *cls, size_t size, void *buf)
576 {
577   struct GNUNET_STREAM_Socket *socket = cls;
578   struct MessageQueue *head;
579   size_t ret;
580
581   socket->transmit_handle = NULL; /* Remove the transmit handle */
582   head = socket->queue_head;
583   if (NULL == head)
584     return 0; /* just to be safe */
585   if (0 == size)                /* request timed out */
586   {
587     socket->retries++;
588     LOG (GNUNET_ERROR_TYPE_DEBUG,
589          "%s: Message sending timed out. Retry %d \n",
590          GNUNET_i2s (&socket->other_peer),
591          socket->retries);
592     socket->transmit_handle = 
593       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
594                                          GNUNET_NO, /* Corking */
595                                          /* FIXME: exponential backoff */
596                                          socket->retransmit_timeout,
597                                          &socket->other_peer,
598                                          ntohs (head->message->header.size),
599                                          &send_message_notify,
600                                          socket);
601     return 0;
602   }
603   ret = ntohs (head->message->header.size);
604   GNUNET_assert (size >= ret);
605   memcpy (buf, head->message, ret);
606   if (NULL != head->finish_cb)
607   {
608     head->finish_cb (head->finish_cb_cls, socket);
609   }
610   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
611                                socket->queue_tail,
612                                head);
613   GNUNET_free (head->message);
614   GNUNET_free (head);
615   head = socket->queue_head;
616   if (NULL != head)    /* more pending messages to send */
617   {
618     socket->retries = 0;
619     socket->transmit_handle = 
620       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
621                                          GNUNET_NO, /* Corking */
622                                          /* FIXME: exponential backoff */
623                                          socket->retransmit_timeout,
624                                          &socket->other_peer,
625                                          ntohs (head->message->header.size),
626                                          &send_message_notify,
627                                          socket);
628   }
629   return ret;
630 }
631
632
633 /**
634  * Queues a message for sending using the mesh connection of a socket
635  *
636  * @param socket the socket whose mesh connection is used
637  * @param message the message to be sent
638  * @param finish_cb the callback to be called when the message is sent
639  * @param finish_cb_cls the closure for the callback
640  * @param urgent set to GNUNET_YES to add the message to the beginning of the
641  *          queue; GNUNET_NO to add at the tail
642  */
643 static void
644 queue_message (struct GNUNET_STREAM_Socket *socket,
645                struct GNUNET_STREAM_MessageHeader *message,
646                SendFinishCallback finish_cb,
647                void *finish_cb_cls,
648                int urgent)
649 {
650   struct MessageQueue *queue_entity;
651
652   GNUNET_assert 
653     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
654      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
655   LOG (GNUNET_ERROR_TYPE_DEBUG,
656        "%s: Queueing message of type %d and size %d\n",
657        GNUNET_i2s (&socket->other_peer),
658        ntohs (message->header.type),
659        ntohs (message->header.size));
660   GNUNET_assert (NULL != message);
661   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
662   queue_entity->message = message;
663   queue_entity->finish_cb = finish_cb;
664   queue_entity->finish_cb_cls = finish_cb_cls;
665   if (GNUNET_YES == urgent)
666   {
667     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
668                                  queue_entity);
669     if (NULL != socket->transmit_handle)
670     {
671       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
672       socket->transmit_handle = NULL;
673     }
674   }
675   else
676     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
677                                       socket->queue_tail,
678                                       queue_entity);
679   if (NULL == socket->transmit_handle)
680   {
681     socket->retries = 0;
682     socket->transmit_handle = 
683         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
684                                            GNUNET_NO, /* Corking */
685                                            socket->retransmit_timeout,
686                                            &socket->other_peer,
687                                            ntohs (message->header.size),
688                                            &send_message_notify,
689                                            socket);
690   }
691 }
692
693
694 /**
695  * Copies a message and queues it for sending using the mesh connection of
696  * given socket 
697  *
698  * @param socket the socket whose mesh connection is used
699  * @param message the message to be sent
700  * @param finish_cb the callback to be called when the message is sent
701  * @param finish_cb_cls the closure for the callback
702  */
703 static void
704 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
705                         const struct GNUNET_STREAM_MessageHeader *message,
706                         SendFinishCallback finish_cb,
707                         void *finish_cb_cls)
708 {
709   struct GNUNET_STREAM_MessageHeader *msg_copy;
710   uint16_t size;
711   
712   size = ntohs (message->header.size);
713   msg_copy = GNUNET_malloc (size);
714   memcpy (msg_copy, message, size);
715   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
716 }
717
718
719 /**
720  * Writes data using the given socket. The amount of data written is limited by
721  * the receiver_window_size
722  *
723  * @param socket the socket to use
724  */
725 static void 
726 write_data (struct GNUNET_STREAM_Socket *socket);
727
728
729 /**
730  * Task for retransmitting data messages if they aren't ACK before their ack
731  * deadline 
732  *
733  * @param cls the socket
734  * @param tc the Task context
735  */
736 static void
737 data_retransmission_task (void *cls,
738                           const struct GNUNET_SCHEDULER_TaskContext *tc)
739 {
740   struct GNUNET_STREAM_Socket *socket = cls;
741   
742   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
743   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
744     return;
745   LOG (GNUNET_ERROR_TYPE_DEBUG,
746        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
747   write_data (socket);
748 }
749
750
751 /**
752  * Task for sending ACK message
753  *
754  * @param cls the socket
755  * @param tc the Task context
756  */
757 static void
758 ack_task (void *cls,
759           const struct GNUNET_SCHEDULER_TaskContext *tc)
760 {
761   struct GNUNET_STREAM_Socket *socket = cls;
762   struct GNUNET_STREAM_AckMessage *ack_msg;
763
764   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
765   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
766     return;
767   /* Create the ACK Message */
768   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
769   ack_msg->header.header.size = htons (sizeof (struct 
770                                                GNUNET_STREAM_AckMessage));
771   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
772   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
773   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
774   ack_msg->receive_window_remaining = 
775     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
776   /* Queue up ACK for immediate sending */
777   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
778 }
779
780
781 /**
782  * Retransmission task for shutdown messages
783  *
784  * @param cls the shutdown handle
785  * @param tc the Task Context
786  */
787 static void
788 close_msg_retransmission_task (void *cls,
789                                const struct GNUNET_SCHEDULER_TaskContext *tc)
790 {
791   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
792   struct GNUNET_STREAM_MessageHeader *msg;
793   struct GNUNET_STREAM_Socket *socket;
794
795   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
796   GNUNET_assert (NULL != shutdown_handle);
797   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
798     return;
799   socket = shutdown_handle->socket;
800   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
801   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
802   switch (shutdown_handle->operation)
803   {
804   case SHUT_RDWR:
805     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
806     break;
807   case SHUT_RD:
808     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
809     break;
810   case SHUT_WR:
811     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
812     break;
813   default:
814     GNUNET_free (msg);
815     shutdown_handle->close_msg_retransmission_task_id = 
816       GNUNET_SCHEDULER_NO_TASK;
817     return;
818   }
819   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
820   shutdown_handle->close_msg_retransmission_task_id =
821     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
822                                   &close_msg_retransmission_task,
823                                   shutdown_handle);
824 }
825
826
827 /**
828  * Function to modify a bit in GNUNET_STREAM_AckBitmap
829  *
830  * @param bitmap the bitmap to modify
831  * @param bit the bit number to modify
832  * @param value GNUNET_YES to on, GNUNET_NO to off
833  */
834 static void
835 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
836                       unsigned int bit, 
837                       int value)
838 {
839   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
840   if (GNUNET_YES == value)
841     *bitmap |= (1LL << bit);
842   else
843     *bitmap &= ~(1LL << bit);
844 }
845
846
847 /**
848  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
849  *
850  * @param bitmap address of the bitmap that has to be checked
851  * @param bit the bit number to check
852  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
853  */
854 static uint8_t
855 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
856                       unsigned int bit)
857 {
858   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
859   return 0 != (*bitmap & (1LL << bit));
860 }
861
862
863 /**
864  * Writes data using the given socket. The amount of data written is limited by
865  * the receiver_window_size
866  *
867  * @param socket the socket to use
868  */
869 static void 
870 write_data (struct GNUNET_STREAM_Socket *socket)
871 {
872   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
873   unsigned int packet;
874   
875   for (packet=0; packet < io_handle->packets_sent; packet++)
876   {
877     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
878                                            packet))
879     {
880       LOG (GNUNET_ERROR_TYPE_DEBUG,
881            "%s: Retransmitting DATA message with sequence %u\n",
882            GNUNET_i2s (&socket->other_peer),
883            ntohl (io_handle->messages[packet]->sequence_number));
884       copy_and_queue_message (socket,
885                               &io_handle->messages[packet]->header,
886                               NULL,
887                               NULL);
888     }
889   }
890   /* Now send new packets if there is enough buffer space */
891   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
892          (NULL != io_handle->messages[packet]) &&
893          (socket->receiver_window_available 
894           >= ntohs (io_handle->messages[packet]->header.header.size)))
895   {
896     socket->receiver_window_available -= 
897       ntohs (io_handle->messages[packet]->header.header.size);
898     LOG (GNUNET_ERROR_TYPE_DEBUG,
899          "%s: Placing DATA message with sequence %u in send queue\n",
900          GNUNET_i2s (&socket->other_peer),
901          ntohl (io_handle->messages[packet]->sequence_number));
902     copy_and_queue_message (socket,
903                             &io_handle->messages[packet]->header,
904                             NULL,
905                             NULL);
906     packet++;
907   }
908   io_handle->packets_sent = packet;
909   // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
910   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
911     socket->data_retransmission_task_id = 
912       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
913                                     (GNUNET_TIME_UNIT_SECONDS, 8),
914                                     &data_retransmission_task,
915                                     socket);
916 }
917
918
919 /**
920  * Task for calling the read processor
921  *
922  * @param cls the socket
923  * @param tc the task context
924  */
925 static void
926 call_read_processor (void *cls,
927                      const struct GNUNET_SCHEDULER_TaskContext *tc)
928 {
929   struct GNUNET_STREAM_Socket *socket = cls;
930   struct GNUNET_STREAM_IOReadHandle *read_handle;
931   size_t read_size;
932   size_t valid_read_size;
933   unsigned int packet;
934   uint32_t sequence_increase;
935   uint32_t offset_increase;
936
937   read_handle = socket->read_handle;
938   GNUNET_assert (NULL != read_handle);
939   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
940   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
941     return;
942   if (NULL == socket->receive_buffer) 
943     return;
944   GNUNET_assert (NULL != socket->read_handle);
945   GNUNET_assert (NULL != socket->read_handle->proc);
946   /* Check the bitmap for any holes */
947   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
948   {
949     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
950                                            packet))
951       break;
952   }
953   /* We only call read processor if we have the first packet */
954   GNUNET_assert (0 < packet);
955   valid_read_size = 
956     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
957   GNUNET_assert (0 != valid_read_size);
958   /* Cancel the read_io_timeout_task */
959   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
960   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
961   /* Call the data processor */
962   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
963        GNUNET_i2s (&socket->other_peer));
964   read_size = 
965       socket->read_handle->proc (socket->read_handle->proc_cls,
966                                  socket->status,
967                                  socket->receive_buffer + socket->copy_offset,
968                                  valid_read_size);
969   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
970        GNUNET_i2s (&socket->other_peer), read_size);
971   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
972        GNUNET_i2s (&socket->other_peer));
973   /* Free the read handle */
974   GNUNET_free (socket->read_handle);
975   socket->read_handle = NULL;
976   GNUNET_assert (read_size <= valid_read_size);
977   socket->copy_offset += read_size;
978   /* Determine upto which packet we can remove from the buffer */
979   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
980   {
981     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
982     { 
983       packet++; 
984       break;
985     }
986     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
987       break;
988   }
989   /* If no packets can be removed we can't move the buffer */
990   if (0 == packet) 
991     return;
992   sequence_increase = packet;
993   LOG (GNUNET_ERROR_TYPE_DEBUG,
994        "%s: Sequence increase after read processor completion: %u\n",
995        GNUNET_i2s (&socket->other_peer), sequence_increase);
996   /* Shift the data in the receive buffer */
997   socket->receive_buffer = 
998     memmove (socket->receive_buffer,
999              socket->receive_buffer 
1000              + socket->receive_buffer_boundaries[sequence_increase-1],
1001              socket->receive_buffer_size
1002              - socket->receive_buffer_boundaries[sequence_increase-1]);
1003   /* Shift the bitmap */
1004   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1005   /* Set read_sequence_number */
1006   socket->read_sequence_number += sequence_increase;
1007   /* Set read_offset */
1008   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1009   socket->read_offset += offset_increase;
1010   /* Fix copy_offset */
1011   GNUNET_assert (offset_increase <= socket->copy_offset);
1012   socket->copy_offset -= offset_increase;
1013   /* Fix relative boundaries */
1014   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1015   {
1016     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1017     {
1018       uint32_t ahead_buffer_boundary;
1019
1020       ahead_buffer_boundary = 
1021         socket->receive_buffer_boundaries[packet + sequence_increase];
1022       if (0 == ahead_buffer_boundary)
1023         socket->receive_buffer_boundaries[packet] = 0;
1024       else
1025       {
1026         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1027         socket->receive_buffer_boundaries[packet] = 
1028           ahead_buffer_boundary - offset_increase;
1029       }
1030     }
1031     else
1032       socket->receive_buffer_boundaries[packet] = 0;
1033   }
1034 }
1035
1036
1037 /**
1038  * Cancels the existing read io handle
1039  *
1040  * @param cls the closure from the SCHEDULER call
1041  * @param tc the task context
1042  */
1043 static void
1044 read_io_timeout (void *cls,
1045                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1046 {
1047   struct GNUNET_STREAM_Socket *socket = cls;
1048   struct GNUNET_STREAM_IOReadHandle *read_handle;
1049   GNUNET_STREAM_DataProcessor proc;
1050   void *proc_cls;
1051
1052   read_handle = socket->read_handle;
1053   GNUNET_assert (NULL != read_handle);
1054   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1055   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1056     return;
1057   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1058   {
1059     LOG (GNUNET_ERROR_TYPE_DEBUG,
1060          "%s: Read task timedout - Cancelling it\n",
1061          GNUNET_i2s (&socket->other_peer));
1062     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1063     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1064   }
1065   proc = read_handle->proc;
1066   proc_cls = read_handle->proc_cls;
1067   GNUNET_free (read_handle);
1068   socket->read_handle = NULL;
1069   /* Call the read processor to signal timeout */
1070   proc (proc_cls,
1071         GNUNET_STREAM_TIMEOUT,
1072         NULL,
1073         0);
1074 }
1075
1076
1077 /**
1078  * Handler for DATA messages; Same for both client and server
1079  *
1080  * @param socket the socket through which the ack was received
1081  * @param tunnel connection to the other end
1082  * @param sender who sent the message
1083  * @param msg the data message
1084  * @param atsi performance data for the connection
1085  * @return GNUNET_OK to keep the connection open,
1086  *         GNUNET_SYSERR to close it (signal serious error)
1087  */
1088 static int
1089 handle_data (struct GNUNET_STREAM_Socket *socket,
1090              struct GNUNET_MESH_Tunnel *tunnel,
1091              const struct GNUNET_PeerIdentity *sender,
1092              const struct GNUNET_STREAM_DataMessage *msg,
1093              const struct GNUNET_ATS_Information*atsi)
1094 {
1095   const void *payload;
1096   struct GNUNET_TIME_Relative ack_deadline_rel;
1097   uint32_t bytes_needed;
1098   uint32_t relative_offset;
1099   uint32_t relative_sequence_number;
1100   uint16_t size;
1101
1102   size = htons (msg->header.header.size);
1103   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1104   {
1105     GNUNET_break_op (0);
1106     return GNUNET_SYSERR;
1107   }
1108   if (0 != memcmp (sender, &socket->other_peer,
1109                    sizeof (struct GNUNET_PeerIdentity)))
1110   {
1111     LOG (GNUNET_ERROR_TYPE_DEBUG,
1112          "%s: Received DATA from non-confirming peer\n",
1113          GNUNET_i2s (&socket->other_peer));
1114     return GNUNET_YES;
1115   }
1116   switch (socket->state)
1117   {
1118   case STATE_ESTABLISHED:
1119   case STATE_TRANSMIT_CLOSED:
1120   case STATE_TRANSMIT_CLOSE_WAIT:      
1121     /* check if the message's sequence number is in the range we are
1122        expecting */
1123     relative_sequence_number = 
1124       ntohl (msg->sequence_number) - socket->read_sequence_number;
1125     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1126     {
1127       LOG (GNUNET_ERROR_TYPE_DEBUG,
1128            "%s: Ignoring received message with sequence number %u\n",
1129            GNUNET_i2s (&socket->other_peer),
1130            ntohl (msg->sequence_number));
1131       /* Start ACK sending task if one is not already present */
1132       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1133       {
1134         socket->ack_task_id = 
1135           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1136                                         (msg->ack_deadline),
1137                                         &ack_task,
1138                                         socket);
1139       }
1140       return GNUNET_YES;
1141     }      
1142     /* Check if we have already seen this message */
1143     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1144                                             relative_sequence_number))
1145     {
1146       LOG (GNUNET_ERROR_TYPE_DEBUG,
1147            "%s: Ignoring already received message with sequence number %u\n",
1148            GNUNET_i2s (&socket->other_peer),
1149            ntohl (msg->sequence_number));
1150       /* Start ACK sending task if one is not already present */
1151       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1152       {
1153         socket->ack_task_id = 
1154           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1155                                         (msg->ack_deadline), &ack_task, socket);
1156       }
1157       return GNUNET_YES;
1158     }
1159     LOG (GNUNET_ERROR_TYPE_DEBUG,
1160          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1161          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1162          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1163     /* Check if we have to allocate the buffer */
1164     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1165     relative_offset = ntohl (msg->offset) - socket->read_offset;
1166     bytes_needed = relative_offset + size;
1167     if (bytes_needed > socket->receive_buffer_size)
1168     {
1169       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1170       {
1171         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1172                                                  bytes_needed);
1173         socket->receive_buffer_size = bytes_needed;
1174       }
1175       else
1176       {
1177         LOG (GNUNET_ERROR_TYPE_DEBUG,
1178              "%s: Cannot accommodate packet %d as buffer is full\n",
1179              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1180         return GNUNET_YES;
1181       }
1182     }
1183     /* Copy Data to buffer */
1184     payload = &msg[1];
1185     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1186     memcpy (socket->receive_buffer + relative_offset, payload, size);
1187     socket->receive_buffer_boundaries[relative_sequence_number] = 
1188         relative_offset + size;
1189     /* Modify the ACK bitmap */
1190     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1191                           GNUNET_YES);
1192     /* Start ACK sending task if one is not already present */
1193     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1194     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1195     {
1196       ack_deadline_rel = 
1197           GNUNET_TIME_relative_min (ack_deadline_rel,
1198                                     GNUNET_TIME_relative_multiply
1199                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1200       socket->ack_task_id = 
1201           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1202                                         (msg->ack_deadline), &ack_task, socket);
1203       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1204       socket->ack_time_deadline = ack_deadline_rel;
1205     }
1206     else
1207     {
1208       struct GNUNET_TIME_Relative ack_time_past;
1209       struct GNUNET_TIME_Relative ack_time_remaining;
1210       struct GNUNET_TIME_Relative ack_time_min;
1211       ack_time_past = 
1212           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1213       ack_time_remaining = GNUNET_TIME_relative_subtract
1214           (socket->ack_time_deadline, ack_time_past);
1215       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1216                                                ack_deadline_rel);
1217       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1218                       sizeof (struct GNUNET_TIME_Relative)))
1219       {
1220         ack_deadline_rel = ack_time_min;
1221         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1222         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1223                                                             &ack_task, socket);
1224         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1225         socket->ack_time_deadline = ack_deadline_rel;
1226       }
1227     }
1228     if ((NULL != socket->read_handle) /* A read handle is waiting */
1229         /* There is no current read task */
1230         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1231         /* We have the first packet */
1232         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1233     {
1234       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1235            GNUNET_i2s (&socket->other_peer));
1236       socket->read_handle->read_task_id =
1237           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1238     }
1239     break;
1240   default:
1241     LOG (GNUNET_ERROR_TYPE_DEBUG,
1242          "%s: Received data message when it cannot be handled\n",
1243          GNUNET_i2s (&socket->other_peer));
1244     break;
1245   }
1246   return GNUNET_YES;
1247 }
1248
1249
1250 /**
1251  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1252  *
1253  * @param cls the socket (set from GNUNET_MESH_connect)
1254  * @param tunnel connection to the other end
1255  * @param tunnel_ctx place to store local state associated with the tunnel
1256  * @param sender who sent the message
1257  * @param message the actual message
1258  * @param atsi performance data for the connection
1259  * @return GNUNET_OK to keep the connection open,
1260  *         GNUNET_SYSERR to close it (signal serious error)
1261  */
1262 static int
1263 client_handle_data (void *cls,
1264                     struct GNUNET_MESH_Tunnel *tunnel,
1265                     void **tunnel_ctx,
1266                     const struct GNUNET_PeerIdentity *sender,
1267                     const struct GNUNET_MessageHeader *message,
1268                     const struct GNUNET_ATS_Information*atsi)
1269 {
1270   struct GNUNET_STREAM_Socket *socket = cls;
1271
1272   return handle_data (socket, tunnel, sender, 
1273                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1274 }
1275
1276
1277 /**
1278  * Callback to set state to ESTABLISHED
1279  *
1280  * @param cls the closure NULL;
1281  * @param socket the socket to requiring state change
1282  */
1283 static void
1284 set_state_established (void *cls,
1285                        struct GNUNET_STREAM_Socket *socket)
1286 {
1287   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1288        "%s: Attaining ESTABLISHED state\n",
1289        GNUNET_i2s (&socket->other_peer));
1290   socket->write_offset = 0;
1291   socket->read_offset = 0;
1292   socket->state = STATE_ESTABLISHED;
1293   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1294                  socket->control_retransmission_task_id);
1295   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1296   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1297   if (NULL != socket->lsocket)
1298   {
1299     LOG (GNUNET_ERROR_TYPE_DEBUG,
1300          "%s: Calling listen callback\n",
1301          GNUNET_i2s (&socket->other_peer));
1302     if (GNUNET_SYSERR == 
1303         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1304                                     socket,
1305                                     &socket->other_peer))
1306     {
1307       socket->state = STATE_CLOSED;
1308       /* FIXME: We should close in a decent way (send RST) */
1309       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1310       GNUNET_free (socket);
1311     }
1312   }
1313   else
1314     socket->open_cb (socket->open_cls, socket);
1315 }
1316
1317
1318 /**
1319  * Callback to set state to HELLO_WAIT
1320  *
1321  * @param cls the closure from queue_message
1322  * @param socket the socket to requiring state change
1323  */
1324 static void
1325 set_state_hello_wait (void *cls,
1326                       struct GNUNET_STREAM_Socket *socket)
1327 {
1328   GNUNET_assert (STATE_INIT == socket->state);
1329   LOG (GNUNET_ERROR_TYPE_DEBUG,
1330        "%s: Attaining HELLO_WAIT state\n",
1331        GNUNET_i2s (&socket->other_peer));
1332   socket->state = STATE_HELLO_WAIT;
1333 }
1334
1335
1336 /**
1337  * Callback to set state to CLOSE_WAIT
1338  *
1339  * @param cls the closure from queue_message
1340  * @param socket the socket requiring state change
1341  */
1342 static void
1343 set_state_close_wait (void *cls,
1344                       struct GNUNET_STREAM_Socket *socket)
1345 {
1346   LOG (GNUNET_ERROR_TYPE_DEBUG,
1347        "%s: Attaing CLOSE_WAIT state\n",
1348        GNUNET_i2s (&socket->other_peer));
1349   socket->state = STATE_CLOSE_WAIT;
1350   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1351   socket->receive_buffer = NULL;
1352   socket->receive_buffer_size = 0;
1353 }
1354
1355
1356 /**
1357  * Callback to set state to RECEIVE_CLOSE_WAIT
1358  *
1359  * @param cls the closure from queue_message
1360  * @param socket the socket requiring state change
1361  */
1362 static void
1363 set_state_receive_close_wait (void *cls,
1364                               struct GNUNET_STREAM_Socket *socket)
1365 {
1366   LOG (GNUNET_ERROR_TYPE_DEBUG,
1367        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1368        GNUNET_i2s (&socket->other_peer));
1369   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1370   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1371   socket->receive_buffer = NULL;
1372   socket->receive_buffer_size = 0;
1373 }
1374
1375
1376 /**
1377  * Callback to set state to TRANSMIT_CLOSE_WAIT
1378  *
1379  * @param cls the closure from queue_message
1380  * @param socket the socket requiring state change
1381  */
1382 static void
1383 set_state_transmit_close_wait (void *cls,
1384                                struct GNUNET_STREAM_Socket *socket)
1385 {
1386   LOG (GNUNET_ERROR_TYPE_DEBUG,
1387        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1388        GNUNET_i2s (&socket->other_peer));
1389   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1390 }
1391
1392
1393 /**
1394  * Callback to set state to CLOSED
1395  *
1396  * @param cls the closure from queue_message
1397  * @param socket the socket requiring state change
1398  */
1399 static void
1400 set_state_closed (void *cls,
1401                   struct GNUNET_STREAM_Socket *socket)
1402 {
1403   socket->state = STATE_CLOSED;
1404 }
1405
1406
1407 /**
1408  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1409  *
1410  * @return the generate hello message
1411  */
1412 static struct GNUNET_STREAM_MessageHeader *
1413 generate_hello (void)
1414 {
1415   struct GNUNET_STREAM_MessageHeader *msg;
1416
1417   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1418   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1419   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1420   return msg;
1421 }
1422
1423
1424 /**
1425  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1426  * socket
1427  *
1428  * @param socket the socket for which this HelloAckMessage has to be generated
1429  * @param generate_seq GNUNET_YES to generate the write sequence number,
1430  *          GNUNET_NO to use the existing sequence number
1431  * @return the HelloAckMessage
1432  */
1433 static struct GNUNET_STREAM_HelloAckMessage *
1434 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1435                     int generate_seq)
1436 {
1437   struct GNUNET_STREAM_HelloAckMessage *msg;
1438
1439   if (GNUNET_YES == generate_seq)
1440   {
1441     if (GNUNET_YES == socket->testing_active)
1442       socket->write_sequence_number =
1443         socket->testing_set_write_sequence_number_value;
1444     else
1445       socket->write_sequence_number = 
1446         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1447     LOG_DEBUG ("%s: write sequence number %u\n",
1448                GNUNET_i2s (&socket->other_peer),
1449                (unsigned int) socket->write_sequence_number);
1450   }
1451   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1452   msg->header.header.size = 
1453     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1454   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1455   msg->sequence_number = htonl (socket->write_sequence_number);
1456   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1457   return msg;
1458 }
1459
1460
1461 /**
1462  * Task for retransmitting control messages if they aren't ACK'ed before a
1463  * deadline
1464  *
1465  * @param cls the socket
1466  * @param tc the Task context
1467  */
1468 static void
1469 control_retransmission_task (void *cls,
1470                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1471 {
1472   struct GNUNET_STREAM_Socket *socket = cls;
1473     
1474   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1475   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1476     return;
1477   LOG_DEBUG ("%s: Retransmitting a control message\n",
1478                  GNUNET_i2s (&socket->other_peer));
1479   switch (socket->state)
1480   {
1481   case STATE_INIT:    
1482     GNUNET_break (0);
1483     break;
1484   case STATE_LISTEN:
1485     GNUNET_break (0);
1486     break;
1487   case STATE_HELLO_WAIT:
1488     if (NULL == socket->lsocket) /* We are client */
1489       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1490     else
1491       queue_message (socket,
1492                      (struct GNUNET_STREAM_MessageHeader *)
1493                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1494                      GNUNET_NO);
1495     socket->control_retransmission_task_id =
1496     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1497                                   &control_retransmission_task, socket);
1498     break;
1499   case STATE_ESTABLISHED:
1500     if (NULL == socket->lsocket)
1501       queue_message (socket,
1502                      (struct GNUNET_STREAM_MessageHeader *)
1503                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1504                      GNUNET_NO);
1505     else
1506       GNUNET_break (0);
1507     break;
1508   default:
1509     GNUNET_break (0);
1510   }  
1511 }
1512
1513
1514 /**
1515  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1516  *
1517  * @param cls the socket (set from GNUNET_MESH_connect)
1518  * @param tunnel connection to the other end
1519  * @param tunnel_ctx this is NULL
1520  * @param sender who sent the message
1521  * @param message the actual message
1522  * @param atsi performance data for the connection
1523  * @return GNUNET_OK to keep the connection open,
1524  *         GNUNET_SYSERR to close it (signal serious error)
1525  */
1526 static int
1527 client_handle_hello_ack (void *cls,
1528                          struct GNUNET_MESH_Tunnel *tunnel,
1529                          void **tunnel_ctx,
1530                          const struct GNUNET_PeerIdentity *sender,
1531                          const struct GNUNET_MessageHeader *message,
1532                          const struct GNUNET_ATS_Information*atsi)
1533 {
1534   struct GNUNET_STREAM_Socket *socket = cls;
1535   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1536   struct GNUNET_STREAM_HelloAckMessage *reply;
1537
1538   if (0 != memcmp (sender, &socket->other_peer,
1539                    sizeof (struct GNUNET_PeerIdentity)))
1540   {
1541     LOG (GNUNET_ERROR_TYPE_DEBUG,
1542          "%s: Received HELLO_ACK from non-confirming peer\n",
1543          GNUNET_i2s (&socket->other_peer));
1544     return GNUNET_YES;
1545   }
1546   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1547   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1548        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1549   GNUNET_assert (socket->tunnel == tunnel);
1550   switch (socket->state)
1551   {
1552   case STATE_HELLO_WAIT:
1553     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1554     LOG (GNUNET_ERROR_TYPE_DEBUG,
1555          "%s: Read sequence number %u\n",
1556          GNUNET_i2s (&socket->other_peer),
1557          (unsigned int) socket->read_sequence_number);
1558     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1559     reply = generate_hello_ack (socket, GNUNET_YES);
1560     queue_message (socket, &reply->header, &set_state_established,
1561                    NULL, GNUNET_NO);    
1562     return GNUNET_OK;
1563   case STATE_ESTABLISHED:
1564     // call statistics (# ACKs ignored++)
1565     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1566                    socket->control_retransmission_task_id);
1567     socket->control_retransmission_task_id =
1568       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1569     return GNUNET_OK;
1570   default:
1571     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1572                GNUNET_i2s (&socket->other_peer),
1573                GNUNET_i2s (&socket->other_peer), socket->state);
1574     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1575     return GNUNET_SYSERR;
1576   }
1577 }
1578
1579
1580 /**
1581  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1582  *
1583  * @param cls the socket (set from GNUNET_MESH_connect)
1584  * @param tunnel connection to the other end
1585  * @param tunnel_ctx this is NULL
1586  * @param sender who sent the message
1587  * @param message the actual message
1588  * @param atsi performance data for the connection
1589  * @return GNUNET_OK to keep the connection open,
1590  *         GNUNET_SYSERR to close it (signal serious error)
1591  */
1592 static int
1593 client_handle_reset (void *cls,
1594                      struct GNUNET_MESH_Tunnel *tunnel,
1595                      void **tunnel_ctx,
1596                      const struct GNUNET_PeerIdentity *sender,
1597                      const struct GNUNET_MessageHeader *message,
1598                      const struct GNUNET_ATS_Information*atsi)
1599 {
1600   // struct GNUNET_STREAM_Socket *socket = cls;
1601
1602   return GNUNET_OK;
1603 }
1604
1605
1606 /**
1607  * Common message handler for handling TRANSMIT_CLOSE messages
1608  *
1609  * @param socket the socket through which the ack was received
1610  * @param tunnel connection to the other end
1611  * @param sender who sent the message
1612  * @param msg the transmit close message
1613  * @param atsi performance data for the connection
1614  * @return GNUNET_OK to keep the connection open,
1615  *         GNUNET_SYSERR to close it (signal serious error)
1616  */
1617 static int
1618 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1619                        struct GNUNET_MESH_Tunnel *tunnel,
1620                        const struct GNUNET_PeerIdentity *sender,
1621                        const struct GNUNET_STREAM_MessageHeader *msg,
1622                        const struct GNUNET_ATS_Information*atsi)
1623 {
1624   struct GNUNET_STREAM_MessageHeader *reply;
1625
1626   switch (socket->state)
1627   {
1628   case STATE_ESTABLISHED:
1629     socket->state = STATE_RECEIVE_CLOSED;
1630     /* Send TRANSMIT_CLOSE_ACK */
1631     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1632     reply->header.type = 
1633       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1634     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1635     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1636     break;
1637   default:
1638     /* FIXME: Call statistics? */
1639     break;
1640   }
1641   return GNUNET_YES;
1642 }
1643
1644
1645 /**
1646  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1647  *
1648  * @param cls the socket (set from GNUNET_MESH_connect)
1649  * @param tunnel connection to the other end
1650  * @param tunnel_ctx this is NULL
1651  * @param sender who sent the message
1652  * @param message the actual message
1653  * @param atsi performance data for the connection
1654  * @return GNUNET_OK to keep the connection open,
1655  *         GNUNET_SYSERR to close it (signal serious error)
1656  */
1657 static int
1658 client_handle_transmit_close (void *cls,
1659                               struct GNUNET_MESH_Tunnel *tunnel,
1660                               void **tunnel_ctx,
1661                               const struct GNUNET_PeerIdentity *sender,
1662                               const struct GNUNET_MessageHeader *message,
1663                               const struct GNUNET_ATS_Information*atsi)
1664 {
1665   struct GNUNET_STREAM_Socket *socket = cls;
1666   
1667   return handle_transmit_close (socket,
1668                                 tunnel,
1669                                 sender,
1670                                 (struct GNUNET_STREAM_MessageHeader *)message,
1671                                 atsi);
1672 }
1673
1674
1675 /**
1676  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1677  *
1678  * @param socket the socket
1679  * @param tunnel connection to the other end
1680  * @param sender who sent the message
1681  * @param message the actual message
1682  * @param atsi performance data for the connection
1683  * @param operation the close operation which is being ACK'ed
1684  * @return GNUNET_OK to keep the connection open,
1685  *         GNUNET_SYSERR to close it (signal serious error)
1686  */
1687 static int
1688 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1689                           struct GNUNET_MESH_Tunnel *tunnel,
1690                           const struct GNUNET_PeerIdentity *sender,
1691                           const struct GNUNET_STREAM_MessageHeader *message,
1692                           const struct GNUNET_ATS_Information *atsi,
1693                           int operation)
1694 {
1695   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1696
1697   shutdown_handle = socket->shutdown_handle;
1698   if (NULL == shutdown_handle)
1699   {
1700     LOG (GNUNET_ERROR_TYPE_DEBUG,
1701          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1702          GNUNET_i2s (&socket->other_peer));
1703     return GNUNET_OK;
1704   }
1705   switch (operation)
1706   {
1707   case SHUT_RDWR:
1708     switch (socket->state)
1709     {
1710     case STATE_CLOSE_WAIT:
1711       if (SHUT_RDWR != shutdown_handle->operation)
1712       {
1713         LOG (GNUNET_ERROR_TYPE_DEBUG,
1714              "%s: Received CLOSE_ACK when shutdown handle is not for "
1715              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1716         return GNUNET_OK;
1717       }
1718       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1719            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1720       socket->state = STATE_CLOSED;
1721       break;
1722     default:
1723       LOG (GNUNET_ERROR_TYPE_DEBUG,
1724            "%s: Received CLOSE_ACK when in it not expected\n",
1725            GNUNET_i2s (&socket->other_peer));
1726       return GNUNET_OK;
1727     }
1728     break;
1729   case SHUT_RD:
1730     switch (socket->state)
1731     {
1732     case STATE_RECEIVE_CLOSE_WAIT:
1733       if (SHUT_RD != shutdown_handle->operation)
1734       {
1735         LOG (GNUNET_ERROR_TYPE_DEBUG,
1736              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1737              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1738         return GNUNET_OK;
1739       }
1740       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1741            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1742       socket->state = STATE_RECEIVE_CLOSED;
1743       break;
1744     default:
1745       LOG (GNUNET_ERROR_TYPE_DEBUG,
1746            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1747            GNUNET_i2s (&socket->other_peer));
1748       return GNUNET_OK;
1749     }
1750     break;
1751   case SHUT_WR:
1752     switch (socket->state)
1753     {
1754     case STATE_TRANSMIT_CLOSE_WAIT:
1755       if (SHUT_WR != shutdown_handle->operation)
1756       {
1757         LOG (GNUNET_ERROR_TYPE_DEBUG,
1758              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1759              "is not for SHUT_WR\n",
1760              GNUNET_i2s (&socket->other_peer));
1761         return GNUNET_OK;
1762       }
1763       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1764            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1765       socket->state = STATE_TRANSMIT_CLOSED;
1766       break;
1767     default:
1768       LOG (GNUNET_ERROR_TYPE_DEBUG,
1769            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1770            GNUNET_i2s (&socket->other_peer));          
1771       return GNUNET_OK;
1772     }
1773     break;
1774   default:
1775     GNUNET_assert (0);
1776   }
1777   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1778     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1779                                    operation);
1780   if (GNUNET_SCHEDULER_NO_TASK
1781       != shutdown_handle->close_msg_retransmission_task_id)
1782   {
1783     GNUNET_SCHEDULER_cancel
1784       (shutdown_handle->close_msg_retransmission_task_id);
1785     shutdown_handle->close_msg_retransmission_task_id =
1786         GNUNET_SCHEDULER_NO_TASK;
1787   }
1788   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1789   socket->shutdown_handle = NULL;
1790   return GNUNET_OK;
1791 }
1792
1793
1794 /**
1795  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1796  *
1797  * @param cls the socket (set from GNUNET_MESH_connect)
1798  * @param tunnel connection to the other end
1799  * @param tunnel_ctx this is NULL
1800  * @param sender who sent the message
1801  * @param message the actual message
1802  * @param atsi performance data for the connection
1803  * @return GNUNET_OK to keep the connection open,
1804  *         GNUNET_SYSERR to close it (signal serious error)
1805  */
1806 static int
1807 client_handle_transmit_close_ack (void *cls,
1808                                   struct GNUNET_MESH_Tunnel *tunnel,
1809                                   void **tunnel_ctx,
1810                                   const struct GNUNET_PeerIdentity *sender,
1811                                   const struct GNUNET_MessageHeader *message,
1812                                   const struct GNUNET_ATS_Information*atsi)
1813 {
1814   struct GNUNET_STREAM_Socket *socket = cls;
1815
1816   return handle_generic_close_ack (socket,
1817                                    tunnel,
1818                                    sender,
1819                                    (const struct GNUNET_STREAM_MessageHeader *)
1820                                    message,
1821                                    atsi,
1822                                    SHUT_WR);
1823 }
1824
1825
1826 /**
1827  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1828  *
1829  * @param socket the socket
1830  * @param tunnel connection to the other end
1831  * @param sender who sent the message
1832  * @param message the actual message
1833  * @param atsi performance data for the connection
1834  * @return GNUNET_OK to keep the connection open,
1835  *         GNUNET_SYSERR to close it (signal serious error)
1836  */
1837 static int
1838 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1839                       struct GNUNET_MESH_Tunnel *tunnel,
1840                       const struct GNUNET_PeerIdentity *sender,
1841                       const struct GNUNET_STREAM_MessageHeader *message,
1842                       const struct GNUNET_ATS_Information *atsi)
1843 {
1844   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1845
1846   switch (socket->state)
1847   {
1848   case STATE_INIT:
1849   case STATE_LISTEN:
1850   case STATE_HELLO_WAIT:
1851     LOG (GNUNET_ERROR_TYPE_DEBUG,
1852          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1853          GNUNET_i2s (&socket->other_peer));
1854     return GNUNET_OK;
1855   default:
1856     break;
1857   }
1858   
1859   LOG (GNUNET_ERROR_TYPE_DEBUG,
1860        "%s: Received RECEIVE_CLOSE from %s\n",
1861        GNUNET_i2s (&socket->other_peer),
1862        GNUNET_i2s (&socket->other_peer));
1863   receive_close_ack =
1864     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1865   receive_close_ack->header.size =
1866     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1867   receive_close_ack->header.type =
1868     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1869   queue_message (socket, receive_close_ack, &set_state_closed,
1870                  NULL, GNUNET_NO);  
1871   /* FIXME: Handle the case where write handle is present; the write operation
1872      should be deemed as finised and the write continuation callback
1873      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1874   return GNUNET_OK;
1875 }
1876
1877
1878 /**
1879  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1880  *
1881  * @param cls the socket (set from GNUNET_MESH_connect)
1882  * @param tunnel connection to the other end
1883  * @param tunnel_ctx this is NULL
1884  * @param sender who sent the message
1885  * @param message the actual message
1886  * @param atsi performance data for the connection
1887  * @return GNUNET_OK to keep the connection open,
1888  *         GNUNET_SYSERR to close it (signal serious error)
1889  */
1890 static int
1891 client_handle_receive_close (void *cls,
1892                              struct GNUNET_MESH_Tunnel *tunnel,
1893                              void **tunnel_ctx,
1894                              const struct GNUNET_PeerIdentity *sender,
1895                              const struct GNUNET_MessageHeader *message,
1896                              const struct GNUNET_ATS_Information*atsi)
1897 {
1898   struct GNUNET_STREAM_Socket *socket = cls;
1899
1900   return
1901     handle_receive_close (socket,
1902                           tunnel,
1903                           sender,
1904                           (const struct GNUNET_STREAM_MessageHeader *) message,
1905                           atsi);
1906 }
1907
1908
1909 /**
1910  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1911  *
1912  * @param cls the socket (set from GNUNET_MESH_connect)
1913  * @param tunnel connection to the other end
1914  * @param tunnel_ctx this is NULL
1915  * @param sender who sent the message
1916  * @param message the actual message
1917  * @param atsi performance data for the connection
1918  * @return GNUNET_OK to keep the connection open,
1919  *         GNUNET_SYSERR to close it (signal serious error)
1920  */
1921 static int
1922 client_handle_receive_close_ack (void *cls,
1923                                  struct GNUNET_MESH_Tunnel *tunnel,
1924                                  void **tunnel_ctx,
1925                                  const struct GNUNET_PeerIdentity *sender,
1926                                  const struct GNUNET_MessageHeader *message,
1927                                  const struct GNUNET_ATS_Information*atsi)
1928 {
1929   struct GNUNET_STREAM_Socket *socket = cls;
1930
1931   return handle_generic_close_ack (socket,
1932                                    tunnel,
1933                                    sender,
1934                                    (const struct GNUNET_STREAM_MessageHeader *)
1935                                    message,
1936                                    atsi,
1937                                    SHUT_RD);
1938 }
1939
1940
1941 /**
1942  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1943  *
1944  * @param socket the socket
1945  * @param tunnel connection to the other end
1946  * @param sender who sent the message
1947  * @param message the actual message
1948  * @param atsi performance data for the connection
1949  * @return GNUNET_OK to keep the connection open,
1950  *         GNUNET_SYSERR to close it (signal serious error)
1951  */
1952 static int
1953 handle_close (struct GNUNET_STREAM_Socket *socket,
1954               struct GNUNET_MESH_Tunnel *tunnel,
1955               const struct GNUNET_PeerIdentity *sender,
1956               const struct GNUNET_STREAM_MessageHeader *message,
1957               const struct GNUNET_ATS_Information*atsi)
1958 {
1959   struct GNUNET_STREAM_MessageHeader *close_ack;
1960
1961   switch (socket->state)
1962   {
1963   case STATE_INIT:
1964   case STATE_LISTEN:
1965   case STATE_HELLO_WAIT:
1966     LOG (GNUNET_ERROR_TYPE_DEBUG,
1967          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1968          GNUNET_i2s (&socket->other_peer));
1969     return GNUNET_OK;
1970   default:
1971     break;
1972   }
1973
1974   LOG (GNUNET_ERROR_TYPE_DEBUG,
1975        "%s: Received CLOSE from %s\n",
1976        GNUNET_i2s (&socket->other_peer),
1977        GNUNET_i2s (&socket->other_peer));
1978   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1979   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1980   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1981   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1982   if (socket->state == STATE_CLOSED)
1983     return GNUNET_OK;
1984
1985   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1986   socket->receive_buffer = NULL;
1987   socket->receive_buffer_size = 0;
1988   return GNUNET_OK;
1989 }
1990
1991
1992 /**
1993  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1994  *
1995  * @param cls the socket (set from GNUNET_MESH_connect)
1996  * @param tunnel connection to the other end
1997  * @param tunnel_ctx this is NULL
1998  * @param sender who sent the message
1999  * @param message the actual message
2000  * @param atsi performance data for the connection
2001  * @return GNUNET_OK to keep the connection open,
2002  *         GNUNET_SYSERR to close it (signal serious error)
2003  */
2004 static int
2005 client_handle_close (void *cls,
2006                      struct GNUNET_MESH_Tunnel *tunnel,
2007                      void **tunnel_ctx,
2008                      const struct GNUNET_PeerIdentity *sender,
2009                      const struct GNUNET_MessageHeader *message,
2010                      const struct GNUNET_ATS_Information*atsi)
2011 {
2012   struct GNUNET_STREAM_Socket *socket = cls;
2013
2014   return handle_close (socket,
2015                        tunnel,
2016                        sender,
2017                        (const struct GNUNET_STREAM_MessageHeader *) message,
2018                        atsi);
2019 }
2020
2021
2022 /**
2023  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2024  *
2025  * @param cls the socket (set from GNUNET_MESH_connect)
2026  * @param tunnel connection to the other end
2027  * @param tunnel_ctx this is NULL
2028  * @param sender who sent the message
2029  * @param message the actual message
2030  * @param atsi performance data for the connection
2031  * @return GNUNET_OK to keep the connection open,
2032  *         GNUNET_SYSERR to close it (signal serious error)
2033  */
2034 static int
2035 client_handle_close_ack (void *cls,
2036                          struct GNUNET_MESH_Tunnel *tunnel,
2037                          void **tunnel_ctx,
2038                          const struct GNUNET_PeerIdentity *sender,
2039                          const struct GNUNET_MessageHeader *message,
2040                          const struct GNUNET_ATS_Information *atsi)
2041 {
2042   struct GNUNET_STREAM_Socket *socket = cls;
2043
2044   return handle_generic_close_ack (socket,
2045                                    tunnel,
2046                                    sender,
2047                                    (const struct GNUNET_STREAM_MessageHeader *) 
2048                                    message,
2049                                    atsi,
2050                                    SHUT_RDWR);
2051 }
2052
2053 /*****************************/
2054 /* Server's Message Handlers */
2055 /*****************************/
2056
2057 /**
2058  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2059  *
2060  * @param cls the closure
2061  * @param tunnel connection to the other end
2062  * @param tunnel_ctx the socket
2063  * @param sender who sent the message
2064  * @param message the actual message
2065  * @param atsi performance data for the connection
2066  * @return GNUNET_OK to keep the connection open,
2067  *         GNUNET_SYSERR to close it (signal serious error)
2068  */
2069 static int
2070 server_handle_data (void *cls,
2071                     struct GNUNET_MESH_Tunnel *tunnel,
2072                     void **tunnel_ctx,
2073                     const struct GNUNET_PeerIdentity *sender,
2074                     const struct GNUNET_MessageHeader *message,
2075                     const struct GNUNET_ATS_Information*atsi)
2076 {
2077   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2078
2079   return handle_data (socket,
2080                       tunnel,
2081                       sender,
2082                       (const struct GNUNET_STREAM_DataMessage *)message,
2083                       atsi);
2084 }
2085
2086
2087 /**
2088  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2089  *
2090  * @param cls the closure
2091  * @param tunnel connection to the other end
2092  * @param tunnel_ctx the socket
2093  * @param sender who sent the message
2094  * @param message the actual message
2095  * @param atsi performance data for the connection
2096  * @return GNUNET_OK to keep the connection open,
2097  *         GNUNET_SYSERR to close it (signal serious error)
2098  */
2099 static int
2100 server_handle_hello (void *cls,
2101                      struct GNUNET_MESH_Tunnel *tunnel,
2102                      void **tunnel_ctx,
2103                      const struct GNUNET_PeerIdentity *sender,
2104                      const struct GNUNET_MessageHeader *message,
2105                      const struct GNUNET_ATS_Information*atsi)
2106 {
2107   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2108   struct GNUNET_STREAM_HelloAckMessage *reply;
2109
2110   if (0 != memcmp (sender,
2111                    &socket->other_peer,
2112                    sizeof (struct GNUNET_PeerIdentity)))
2113   {
2114     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2115                GNUNET_i2s (&socket->other_peer));
2116     return GNUNET_YES;
2117   }
2118   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2119   GNUNET_assert (socket->tunnel == tunnel);
2120   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2121              GNUNET_i2s (&socket->other_peer));
2122   switch (socket->state)
2123   {
2124   case STATE_INIT:
2125     reply = generate_hello_ack (socket, GNUNET_YES);
2126     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2127                    GNUNET_NO);
2128     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2129                    socket->control_retransmission_task_id);
2130     socket->control_retransmission_task_id =
2131       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2132                                     &control_retransmission_task, socket);
2133     break;
2134   case STATE_HELLO_WAIT:
2135     /* Perhaps our HELLO_ACK was lost */
2136     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2137                    socket->control_retransmission_task_id);
2138     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2139     socket->control_retransmission_task_id =
2140       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2141     break;
2142   default:
2143     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2144                GNUNET_i2s (&socket->other_peer), socket->state);
2145     /* FIXME: Send RESET? */
2146   }
2147   return GNUNET_OK;
2148 }
2149
2150
2151 /**
2152  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2153  *
2154  * @param cls the closure
2155  * @param tunnel connection to the other end
2156  * @param tunnel_ctx the socket
2157  * @param sender who sent the message
2158  * @param message the actual message
2159  * @param atsi performance data for the connection
2160  * @return GNUNET_OK to keep the connection open,
2161  *         GNUNET_SYSERR to close it (signal serious error)
2162  */
2163 static int
2164 server_handle_hello_ack (void *cls,
2165                          struct GNUNET_MESH_Tunnel *tunnel,
2166                          void **tunnel_ctx,
2167                          const struct GNUNET_PeerIdentity *sender,
2168                          const struct GNUNET_MessageHeader *message,
2169                          const struct GNUNET_ATS_Information*atsi)
2170 {
2171   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2172   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2173
2174   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2175                  ntohs (message->type));
2176   GNUNET_assert (socket->tunnel == tunnel);
2177   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2178   switch (socket->state)  
2179   {
2180   case STATE_HELLO_WAIT:
2181     LOG (GNUNET_ERROR_TYPE_DEBUG,
2182          "%s: Received HELLO_ACK from %s\n",
2183          GNUNET_i2s (&socket->other_peer),
2184          GNUNET_i2s (&socket->other_peer));
2185     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2186     LOG (GNUNET_ERROR_TYPE_DEBUG,
2187          "%s: Read sequence number %u\n",
2188          GNUNET_i2s (&socket->other_peer),
2189          (unsigned int) socket->read_sequence_number);
2190     socket->receiver_window_available = 
2191       ntohl (ack_message->receiver_window_size);
2192     set_state_established (NULL, socket);
2193     break;
2194   default:
2195     LOG (GNUNET_ERROR_TYPE_DEBUG,
2196          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2197   }
2198   return GNUNET_OK;
2199 }
2200
2201
2202 /**
2203  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2204  *
2205  * @param cls the closure
2206  * @param tunnel connection to the other end
2207  * @param tunnel_ctx the socket
2208  * @param sender who sent the message
2209  * @param message the actual message
2210  * @param atsi performance data for the connection
2211  * @return GNUNET_OK to keep the connection open,
2212  *         GNUNET_SYSERR to close it (signal serious error)
2213  */
2214 static int
2215 server_handle_reset (void *cls,
2216                      struct GNUNET_MESH_Tunnel *tunnel,
2217                      void **tunnel_ctx,
2218                      const struct GNUNET_PeerIdentity *sender,
2219                      const struct GNUNET_MessageHeader *message,
2220                      const struct GNUNET_ATS_Information*atsi)
2221 {
2222   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2223
2224   return GNUNET_OK;
2225 }
2226
2227
2228 /**
2229  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2230  *
2231  * @param cls the closure
2232  * @param tunnel connection to the other end
2233  * @param tunnel_ctx the socket
2234  * @param sender who sent the message
2235  * @param message the actual message
2236  * @param atsi performance data for the connection
2237  * @return GNUNET_OK to keep the connection open,
2238  *         GNUNET_SYSERR to close it (signal serious error)
2239  */
2240 static int
2241 server_handle_transmit_close (void *cls,
2242                               struct GNUNET_MESH_Tunnel *tunnel,
2243                               void **tunnel_ctx,
2244                               const struct GNUNET_PeerIdentity *sender,
2245                               const struct GNUNET_MessageHeader *message,
2246                               const struct GNUNET_ATS_Information*atsi)
2247 {
2248   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2249
2250   return handle_transmit_close (socket,
2251                                 tunnel,
2252                                 sender,
2253                                 (struct GNUNET_STREAM_MessageHeader *)message,
2254                                 atsi);
2255 }
2256
2257
2258 /**
2259  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2260  *
2261  * @param cls the closure
2262  * @param tunnel connection to the other end
2263  * @param tunnel_ctx the socket
2264  * @param sender who sent the message
2265  * @param message the actual message
2266  * @param atsi performance data for the connection
2267  * @return GNUNET_OK to keep the connection open,
2268  *         GNUNET_SYSERR to close it (signal serious error)
2269  */
2270 static int
2271 server_handle_transmit_close_ack (void *cls,
2272                                   struct GNUNET_MESH_Tunnel *tunnel,
2273                                   void **tunnel_ctx,
2274                                   const struct GNUNET_PeerIdentity *sender,
2275                                   const struct GNUNET_MessageHeader *message,
2276                                   const struct GNUNET_ATS_Information*atsi)
2277 {
2278   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2279
2280   return handle_generic_close_ack (socket,
2281                                    tunnel,
2282                                    sender,
2283                                    (const struct GNUNET_STREAM_MessageHeader *)
2284                                    message,
2285                                    atsi,
2286                                    SHUT_WR);
2287 }
2288
2289
2290 /**
2291  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2292  *
2293  * @param cls the closure
2294  * @param tunnel connection to the other end
2295  * @param tunnel_ctx the socket
2296  * @param sender who sent the message
2297  * @param message the actual message
2298  * @param atsi performance data for the connection
2299  * @return GNUNET_OK to keep the connection open,
2300  *         GNUNET_SYSERR to close it (signal serious error)
2301  */
2302 static int
2303 server_handle_receive_close (void *cls,
2304                              struct GNUNET_MESH_Tunnel *tunnel,
2305                              void **tunnel_ctx,
2306                              const struct GNUNET_PeerIdentity *sender,
2307                              const struct GNUNET_MessageHeader *message,
2308                              const struct GNUNET_ATS_Information*atsi)
2309 {
2310   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2311
2312   return
2313     handle_receive_close (socket,
2314                           tunnel,
2315                           sender,
2316                           (const struct GNUNET_STREAM_MessageHeader *) message,
2317                           atsi);
2318 }
2319
2320
2321 /**
2322  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2323  *
2324  * @param cls the closure
2325  * @param tunnel connection to the other end
2326  * @param tunnel_ctx the socket
2327  * @param sender who sent the message
2328  * @param message the actual message
2329  * @param atsi performance data for the connection
2330  * @return GNUNET_OK to keep the connection open,
2331  *         GNUNET_SYSERR to close it (signal serious error)
2332  */
2333 static int
2334 server_handle_receive_close_ack (void *cls,
2335                                  struct GNUNET_MESH_Tunnel *tunnel,
2336                                  void **tunnel_ctx,
2337                                  const struct GNUNET_PeerIdentity *sender,
2338                                  const struct GNUNET_MessageHeader *message,
2339                                  const struct GNUNET_ATS_Information*atsi)
2340 {
2341   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2342
2343   return handle_generic_close_ack (socket,
2344                                    tunnel,
2345                                    sender,
2346                                    (const struct GNUNET_STREAM_MessageHeader *)
2347                                    message,
2348                                    atsi,
2349                                    SHUT_RD);
2350 }
2351
2352
2353 /**
2354  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2355  *
2356  * @param cls the listen socket (from GNUNET_MESH_connect in
2357  *          GNUNET_STREAM_listen) 
2358  * @param tunnel connection to the other end
2359  * @param tunnel_ctx the socket
2360  * @param sender who sent the message
2361  * @param message the actual message
2362  * @param atsi performance data for the connection
2363  * @return GNUNET_OK to keep the connection open,
2364  *         GNUNET_SYSERR to close it (signal serious error)
2365  */
2366 static int
2367 server_handle_close (void *cls,
2368                      struct GNUNET_MESH_Tunnel *tunnel,
2369                      void **tunnel_ctx,
2370                      const struct GNUNET_PeerIdentity *sender,
2371                      const struct GNUNET_MessageHeader *message,
2372                      const struct GNUNET_ATS_Information*atsi)
2373 {
2374   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2375   
2376   return handle_close (socket,
2377                        tunnel,
2378                        sender,
2379                        (const struct GNUNET_STREAM_MessageHeader *) message,
2380                        atsi);
2381 }
2382
2383
2384 /**
2385  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2386  *
2387  * @param cls the closure
2388  * @param tunnel connection to the other end
2389  * @param tunnel_ctx the socket
2390  * @param sender who sent the message
2391  * @param message the actual message
2392  * @param atsi performance data for the connection
2393  * @return GNUNET_OK to keep the connection open,
2394  *         GNUNET_SYSERR to close it (signal serious error)
2395  */
2396 static int
2397 server_handle_close_ack (void *cls,
2398                          struct GNUNET_MESH_Tunnel *tunnel,
2399                          void **tunnel_ctx,
2400                          const struct GNUNET_PeerIdentity *sender,
2401                          const struct GNUNET_MessageHeader *message,
2402                          const struct GNUNET_ATS_Information*atsi)
2403 {
2404   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2405
2406   return handle_generic_close_ack (socket,
2407                                    tunnel,
2408                                    sender,
2409                                    (const struct GNUNET_STREAM_MessageHeader *) 
2410                                    message,
2411                                    atsi,
2412                                    SHUT_RDWR);
2413 }
2414
2415
2416 /**
2417  * Handler for DATA_ACK messages
2418  *
2419  * @param socket the socket through which the ack was received
2420  * @param tunnel connection to the other end
2421  * @param sender who sent the message
2422  * @param ack the acknowledgment message
2423  * @param atsi performance data for the connection
2424  * @return GNUNET_OK to keep the connection open,
2425  *         GNUNET_SYSERR to close it (signal serious error)
2426  */
2427 static int
2428 handle_ack (struct GNUNET_STREAM_Socket *socket,
2429             struct GNUNET_MESH_Tunnel *tunnel,
2430             const struct GNUNET_PeerIdentity *sender,
2431             const struct GNUNET_STREAM_AckMessage *ack,
2432             const struct GNUNET_ATS_Information*atsi)
2433 {
2434   unsigned int packet;
2435   int need_retransmission;
2436   uint32_t sequence_difference;
2437   
2438   if (0 != memcmp (sender,
2439                    &socket->other_peer,
2440                    sizeof (struct GNUNET_PeerIdentity)))
2441   {
2442     LOG (GNUNET_ERROR_TYPE_DEBUG,
2443          "%s: Received ACK from non-confirming peer\n",
2444          GNUNET_i2s (&socket->other_peer));
2445     return GNUNET_YES;
2446   }
2447   switch (socket->state)
2448   {
2449   case (STATE_ESTABLISHED):
2450   case (STATE_RECEIVE_CLOSED):
2451   case (STATE_RECEIVE_CLOSE_WAIT):
2452     if (NULL == socket->write_handle)
2453     {
2454       LOG (GNUNET_ERROR_TYPE_DEBUG,
2455            "%s: Received DATA_ACK when write_handle is NULL\n",
2456            GNUNET_i2s (&socket->other_peer));
2457       return GNUNET_OK;
2458     }
2459     sequence_difference = 
2460         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2461     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2462     {
2463       LOG (GNUNET_ERROR_TYPE_DEBUG,
2464            "%s: Received DATA_ACK with unexpected base sequence number\n",
2465            GNUNET_i2s (&socket->other_peer));
2466       LOG (GNUNET_ERROR_TYPE_DEBUG,
2467            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2468            GNUNET_i2s (&socket->other_peer),
2469            socket->write_sequence_number,
2470            ntohl (ack->base_sequence_number));
2471       return GNUNET_OK;
2472     }
2473     /* FIXME: include the case when write_handle is cancelled - ignore the 
2474        acks */
2475     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2476          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2477     /* Cancel the retransmission task */
2478     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2479     {
2480       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2481       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2482     }
2483     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2484     {
2485       if (NULL == socket->write_handle->messages[packet]) break;
2486       /* BS: Base sequence from ack; PS: sequence num of current packet */
2487       sequence_difference = ntohl (ack->base_sequence_number)
2488         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2489       if ((0 == sequence_difference) ||
2490           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2491         continue; /* The message in our handle is not yet received */
2492       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2493       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2494       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2495                             packet, GNUNET_YES);
2496     }
2497     /* Update the receive window remaining
2498        FIXME : Should update with the value from a data ack with greater
2499        sequence number */
2500     socket->receiver_window_available = 
2501       ntohl (ack->receive_window_remaining);
2502     /* Check if we have received all acknowledgements */
2503     need_retransmission = GNUNET_NO;
2504     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2505     {
2506       if (NULL == socket->write_handle->messages[packet]) break;
2507       if (GNUNET_YES != ackbitmap_is_bit_set 
2508           (&socket->write_handle->ack_bitmap,packet))
2509       {
2510         need_retransmission = GNUNET_YES;
2511         break;
2512       }
2513     }
2514     if (GNUNET_YES == need_retransmission)
2515     {
2516       write_data (socket);
2517     }
2518     else      /* We have to call the write continuation callback now */
2519     {
2520       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2521       
2522       /* Free the packets */
2523       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2524       {
2525         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2526       }
2527       write_handle = socket->write_handle;
2528       socket->write_handle = NULL;
2529       if (NULL != write_handle->write_cont)
2530         write_handle->write_cont (write_handle->write_cont_cls,
2531                                   socket->status,
2532                                   write_handle->size);
2533       /* We are done with the write handle - Freeing it */
2534       GNUNET_free (write_handle);
2535       LOG (GNUNET_ERROR_TYPE_DEBUG,
2536            "%s: Write completion callback completed\n",
2537            GNUNET_i2s (&socket->other_peer));      
2538     }
2539     break;
2540   default:
2541     break;
2542   }
2543   return GNUNET_OK;
2544 }
2545
2546
2547 /**
2548  * Handler for DATA_ACK messages
2549  *
2550  * @param cls the 'struct GNUNET_STREAM_Socket'
2551  * @param tunnel connection to the other end
2552  * @param tunnel_ctx unused
2553  * @param sender who sent the message
2554  * @param message the actual message
2555  * @param atsi performance data for the connection
2556  * @return GNUNET_OK to keep the connection open,
2557  *         GNUNET_SYSERR to close it (signal serious error)
2558  */
2559 static int
2560 client_handle_ack (void *cls,
2561                    struct GNUNET_MESH_Tunnel *tunnel,
2562                    void **tunnel_ctx,
2563                    const struct GNUNET_PeerIdentity *sender,
2564                    const struct GNUNET_MessageHeader *message,
2565                    const struct GNUNET_ATS_Information*atsi)
2566 {
2567   struct GNUNET_STREAM_Socket *socket = cls;
2568   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2569  
2570   return handle_ack (socket, tunnel, sender, ack, atsi);
2571 }
2572
2573
2574 /**
2575  * Handler for DATA_ACK messages
2576  *
2577  * @param cls the server's listen socket
2578  * @param tunnel connection to the other end
2579  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2580  * @param sender who sent the message
2581  * @param message the actual message
2582  * @param atsi performance data for the connection
2583  * @return GNUNET_OK to keep the connection open,
2584  *         GNUNET_SYSERR to close it (signal serious error)
2585  */
2586 static int
2587 server_handle_ack (void *cls,
2588                    struct GNUNET_MESH_Tunnel *tunnel,
2589                    void **tunnel_ctx,
2590                    const struct GNUNET_PeerIdentity *sender,
2591                    const struct GNUNET_MessageHeader *message,
2592                    const struct GNUNET_ATS_Information*atsi)
2593 {
2594   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2595   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2596  
2597   return handle_ack (socket, tunnel, sender, ack, atsi);
2598 }
2599
2600
2601 /**
2602  * For client message handlers, the stream socket is in the
2603  * closure argument.
2604  */
2605 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2606   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2607   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2608    sizeof (struct GNUNET_STREAM_AckMessage) },
2609   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2610    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2611   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2612    sizeof (struct GNUNET_STREAM_MessageHeader)},
2613   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2614    sizeof (struct GNUNET_STREAM_MessageHeader)},
2615   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2616    sizeof (struct GNUNET_STREAM_MessageHeader)},
2617   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2618    sizeof (struct GNUNET_STREAM_MessageHeader)},
2619   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2620    sizeof (struct GNUNET_STREAM_MessageHeader)},
2621   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2622    sizeof (struct GNUNET_STREAM_MessageHeader)},
2623   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2624    sizeof (struct GNUNET_STREAM_MessageHeader)},
2625   {NULL, 0, 0}
2626 };
2627
2628
2629 /**
2630  * For server message handlers, the stream socket is in the
2631  * tunnel context, and the listen socket in the closure argument.
2632  */
2633 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2634   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2635   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2636    sizeof (struct GNUNET_STREAM_AckMessage) },
2637   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2638    sizeof (struct GNUNET_STREAM_MessageHeader)},
2639   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2640    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2641   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2642    sizeof (struct GNUNET_STREAM_MessageHeader)},
2643   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2644    sizeof (struct GNUNET_STREAM_MessageHeader)},
2645   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2646    sizeof (struct GNUNET_STREAM_MessageHeader)},
2647   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2648    sizeof (struct GNUNET_STREAM_MessageHeader)},
2649   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2650    sizeof (struct GNUNET_STREAM_MessageHeader)},
2651   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2652    sizeof (struct GNUNET_STREAM_MessageHeader)},
2653   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2654    sizeof (struct GNUNET_STREAM_MessageHeader)},
2655   {NULL, 0, 0}
2656 };
2657
2658
2659 /**
2660  * Function called when our target peer is connected to our tunnel
2661  *
2662  * @param cls the socket for which this tunnel is created
2663  * @param peer the peer identity of the target
2664  * @param atsi performance data for the connection
2665  */
2666 static void
2667 mesh_peer_connect_callback (void *cls,
2668                             const struct GNUNET_PeerIdentity *peer,
2669                             const struct GNUNET_ATS_Information * atsi)
2670 {
2671   struct GNUNET_STREAM_Socket *socket = cls;
2672   struct GNUNET_STREAM_MessageHeader *message;
2673   
2674   if (0 != memcmp (peer,
2675                    &socket->other_peer,
2676                    sizeof (struct GNUNET_PeerIdentity)))
2677   {
2678     LOG (GNUNET_ERROR_TYPE_DEBUG,
2679          "%s: A peer which is not our target has connected to our tunnel\n",
2680          GNUNET_i2s(peer));
2681     return;
2682   }
2683   LOG (GNUNET_ERROR_TYPE_DEBUG,
2684        "%s: Target peer %s connected\n",
2685        GNUNET_i2s (&socket->other_peer),
2686        GNUNET_i2s (&socket->other_peer));
2687   /* Set state to INIT */
2688   socket->state = STATE_INIT;
2689   /* Send HELLO message */
2690   message = generate_hello ();
2691   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2692   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2693                  socket->control_retransmission_task_id);
2694   socket->control_retransmission_task_id =
2695     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2696                                   &control_retransmission_task, socket);
2697 }
2698
2699
2700 /**
2701  * Function called when our target peer is disconnected from our tunnel
2702  *
2703  * @param cls the socket associated which this tunnel
2704  * @param peer the peer identity of the target
2705  */
2706 static void
2707 mesh_peer_disconnect_callback (void *cls,
2708                                const struct GNUNET_PeerIdentity *peer)
2709 {
2710   struct GNUNET_STREAM_Socket *socket=cls;
2711   
2712   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2713   LOG (GNUNET_ERROR_TYPE_DEBUG,
2714        "%s: Other peer %s disconnected \n",
2715        GNUNET_i2s (&socket->other_peer),
2716        GNUNET_i2s (&socket->other_peer));
2717 }
2718
2719
2720 /**
2721  * Method called whenever a peer creates a tunnel to us
2722  *
2723  * @param cls closure
2724  * @param tunnel new handle to the tunnel
2725  * @param initiator peer that started the tunnel
2726  * @param atsi performance information for the tunnel
2727  * @return initial tunnel context for the tunnel
2728  *         (can be NULL -- that's not an error)
2729  */
2730 static void *
2731 new_tunnel_notify (void *cls,
2732                    struct GNUNET_MESH_Tunnel *tunnel,
2733                    const struct GNUNET_PeerIdentity *initiator,
2734                    const struct GNUNET_ATS_Information *atsi)
2735 {
2736   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2737   struct GNUNET_STREAM_Socket *socket;
2738
2739   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2740      from the same peer again until the socket is closed */
2741
2742   if (GNUNET_NO == lsocket->listening)
2743   {
2744     GNUNET_MESH_tunnel_destroy (tunnel);
2745     return NULL;
2746   }
2747   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2748   socket->other_peer = *initiator;
2749   socket->tunnel = tunnel;
2750   socket->state = STATE_INIT;
2751   socket->lsocket = lsocket;
2752   socket->stat_handle = lsocket->stat_handle;
2753   socket->retransmit_timeout = lsocket->retransmit_timeout;
2754   socket->testing_active = lsocket->testing_active;
2755   socket->testing_set_write_sequence_number_value =
2756       lsocket->testing_set_write_sequence_number_value;
2757   socket->max_payload_size = lsocket->max_payload_size;
2758   LOG (GNUNET_ERROR_TYPE_DEBUG,
2759        "%s: Peer %s initiated tunnel to us\n", 
2760        GNUNET_i2s (&socket->other_peer),
2761        GNUNET_i2s (&socket->other_peer));
2762   if (NULL != socket->stat_handle)
2763   {
2764     GNUNET_STATISTICS_update (socket->stat_handle,
2765                               "total inbound connections received",
2766                               1, GNUNET_NO);
2767     GNUNET_STATISTICS_update (socket->stat_handle,
2768                               "inbound connections", 1, GNUNET_NO);
2769   }
2770   
2771   return socket;
2772 }
2773
2774
2775 /**
2776  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2777  * any associated state.  This function is NOT called if the client has
2778  * explicitly asked for the tunnel to be destroyed using
2779  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2780  * the tunnel.
2781  *
2782  * @param cls closure (set from GNUNET_MESH_connect)
2783  * @param tunnel connection to the other end (henceforth invalid)
2784  * @param tunnel_ctx place where local state associated
2785  *                   with the tunnel is stored
2786  */
2787 static void 
2788 tunnel_cleaner (void *cls,
2789                 const struct GNUNET_MESH_Tunnel *tunnel,
2790                 void *tunnel_ctx)
2791 {
2792   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2793   struct MessageQueue *head;
2794
2795   GNUNET_assert (tunnel == socket->tunnel);
2796   GNUNET_break_op(0);
2797   LOG (GNUNET_ERROR_TYPE_DEBUG,
2798        "%s: Peer %s has terminated connection abruptly\n",
2799        GNUNET_i2s (&socket->other_peer),
2800        GNUNET_i2s (&socket->other_peer));
2801   if (NULL != socket->stat_handle)
2802   {
2803     GNUNET_STATISTICS_update (socket->stat_handle,
2804                               "connections terminated abruptly", 1, GNUNET_NO);
2805     GNUNET_STATISTICS_update (socket->stat_handle,
2806                               "inbound connections", -1, GNUNET_NO);
2807   }
2808   socket->status = GNUNET_STREAM_SHUTDOWN;
2809   /* Clear Transmit handles */
2810   if (NULL != socket->transmit_handle)
2811   {
2812     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2813     socket->transmit_handle = NULL;
2814   }
2815   /* Stop Tasks using socket->tunnel */
2816   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2817   {
2818     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2819     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2820   }
2821   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2822   {
2823     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2824     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2825   }  
2826   /* Terminate the control retransmission tasks */
2827   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2828   {
2829     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2830     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2831   }
2832   /* Clear Transmit handles */
2833   if (NULL != socket->transmit_handle)
2834   {
2835     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2836     socket->transmit_handle = NULL;
2837   }
2838   /* Clear existing message queue */
2839   while (NULL != (head = socket->queue_head)) {
2840     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2841                                  socket->queue_tail,
2842                                  head);
2843     GNUNET_free (head->message);
2844     GNUNET_free (head);
2845   }
2846   socket->tunnel = NULL;
2847 }
2848
2849
2850 /**
2851  * Callback to signal timeout on lockmanager lock acquire
2852  *
2853  * @param cls the ListenSocket
2854  * @param tc the scheduler task context
2855  */
2856 static void
2857 lockmanager_acquire_timeout (void *cls, 
2858                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2859 {
2860   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2861   GNUNET_STREAM_ListenCallback listen_cb;
2862   void *listen_cb_cls;
2863
2864   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2865   listen_cb = lsocket->listen_cb;
2866   listen_cb_cls = lsocket->listen_cb_cls;
2867   if (NULL != listen_cb)
2868     listen_cb (listen_cb_cls, NULL, NULL);
2869 }
2870
2871
2872 /**
2873  * Callback to notify us on the status changes on app_port lock
2874  *
2875  * @param cls the ListenSocket
2876  * @param domain the domain name of the lock
2877  * @param lock the app_port
2878  * @param status the current status of the lock
2879  */
2880 static void
2881 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2882                        enum GNUNET_LOCKMANAGER_Status status)
2883 {
2884   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2885
2886   GNUNET_assert (lock == (uint32_t) lsocket->port);
2887   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2888   {
2889     lsocket->listening = GNUNET_YES;
2890     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2891     {
2892       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2893       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2894     }
2895     if (NULL == lsocket->mesh)
2896     {
2897       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2898
2899       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2900                                            lsocket, /* Closure */
2901                                            &new_tunnel_notify,
2902                                            &tunnel_cleaner,
2903                                            server_message_handlers,
2904                                            ports);
2905       GNUNET_assert (NULL != lsocket->mesh);
2906       if (NULL != lsocket->listen_ok_cb)
2907       {
2908         (void) lsocket->listen_ok_cb ();
2909       }
2910     }
2911   }
2912   if (GNUNET_LOCKMANAGER_RELEASE == status)
2913     lsocket->listening = GNUNET_NO;
2914 }
2915
2916
2917 /*****************/
2918 /* API functions */
2919 /*****************/
2920
2921
2922 /**
2923  * Tries to open a stream to the target peer
2924  *
2925  * @param cfg configuration to use
2926  * @param target the target peer to which the stream has to be opened
2927  * @param app_port the application port number which uniquely identifies this
2928  *            stream
2929  * @param open_cb this function will be called after stream has be established;
2930  *          cannot be NULL
2931  * @param open_cb_cls the closure for open_cb
2932  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2933  * @return if successful it returns the stream socket; NULL if stream cannot be
2934  *         opened 
2935  */
2936 struct GNUNET_STREAM_Socket *
2937 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2938                     const struct GNUNET_PeerIdentity *target,
2939                     GNUNET_MESH_ApplicationType app_port,
2940                     GNUNET_STREAM_OpenCallback open_cb,
2941                     void *open_cb_cls,
2942                     ...)
2943 {
2944   struct GNUNET_STREAM_Socket *socket;
2945   enum GNUNET_STREAM_Option option;
2946   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2947   va_list vargs;
2948   uint16_t payload_size;
2949
2950   LOG (GNUNET_ERROR_TYPE_DEBUG,
2951        "%s\n", __func__);
2952   GNUNET_assert (NULL != open_cb);
2953   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2954   socket->other_peer = *target;
2955   socket->open_cb = open_cb;
2956   socket->open_cls = open_cb_cls;
2957   /* Set defaults */
2958   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2959   socket->testing_active = GNUNET_NO;
2960   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2961   va_start (vargs, open_cb_cls); /* Parse variable args */
2962   do {
2963     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2964     switch (option)
2965     {
2966     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2967       /* Expect struct GNUNET_TIME_Relative */
2968       socket->retransmit_timeout = va_arg (vargs,
2969                                            struct GNUNET_TIME_Relative);
2970       break;
2971     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2972       socket->testing_active = GNUNET_YES;
2973       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2974                                                                 uint32_t);
2975       break;
2976     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2977       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2978       break;
2979     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2980       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2981       break;
2982     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2983       payload_size = (uint16_t) va_arg (vargs, unsigned int);
2984       GNUNET_assert (0 != payload_size);
2985       if (payload_size < socket->max_payload_size)
2986         socket->max_payload_size = payload_size;
2987       break;
2988     case GNUNET_STREAM_OPTION_END:
2989       break;
2990     }
2991   } while (GNUNET_STREAM_OPTION_END != option);
2992   va_end (vargs);               /* End of variable args parsing */
2993   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2994                                       socket, /* cls */
2995                                       NULL, /* No inbound tunnel handler */
2996                                       NULL, /* No in-tunnel cleaner */
2997                                       client_message_handlers,
2998                                       ports); /* We don't get inbound tunnels */
2999   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3000   {
3001     GNUNET_free (socket);
3002     return NULL;
3003   }
3004   /* Now create the mesh tunnel to target */
3005   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3006   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3007                                               NULL, /* Tunnel context */
3008                                               &mesh_peer_connect_callback,
3009                                               &mesh_peer_disconnect_callback,
3010                                               socket);
3011   GNUNET_assert (NULL != socket->tunnel);
3012   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3013                                         &socket->other_peer);
3014   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3015   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3016   return socket;
3017 }
3018
3019
3020 /**
3021  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3022  *
3023  * @param socket the stream socket
3024  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3025  * @param completion_cb the callback that will be called upon successful
3026  *          shutdown of given operation
3027  * @param completion_cls the closure for the completion callback
3028  * @return the shutdown handle
3029  */
3030 struct GNUNET_STREAM_ShutdownHandle *
3031 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3032                         int operation,
3033                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3034                         void *completion_cls)
3035 {
3036   struct GNUNET_STREAM_ShutdownHandle *handle;
3037   struct GNUNET_STREAM_MessageHeader *msg;
3038   
3039   GNUNET_assert (NULL == socket->shutdown_handle);
3040
3041   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3042   handle->socket = socket;
3043   handle->completion_cb = completion_cb;
3044   handle->completion_cls = completion_cls;
3045   socket->shutdown_handle = handle;
3046
3047   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3048   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3049   switch (operation)
3050   {
3051   case SHUT_RD:
3052     handle->operation = SHUT_RD;
3053     if (NULL != socket->read_handle)
3054       LOG (GNUNET_ERROR_TYPE_WARNING,
3055            "Existing read handle should be cancelled before shutting"
3056            " down reading\n");
3057     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3058     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3059                    GNUNET_NO);
3060     break;
3061   case SHUT_WR:
3062     handle->operation = SHUT_WR;
3063     if (NULL != socket->write_handle)
3064       LOG (GNUNET_ERROR_TYPE_WARNING,
3065            "Existing write handle should be cancelled before shutting"
3066            " down writing\n");
3067     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3068     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3069                    GNUNET_NO);
3070     break;
3071   case SHUT_RDWR:
3072     handle->operation = SHUT_RDWR;
3073     if (NULL != socket->write_handle)
3074       LOG (GNUNET_ERROR_TYPE_WARNING,
3075            "Existing write handle should be cancelled before shutting"
3076            " down writing\n");
3077     if (NULL != socket->read_handle)
3078       LOG (GNUNET_ERROR_TYPE_WARNING,
3079            "Existing read handle should be cancelled before shutting"
3080            " down reading\n");
3081     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3082     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3083     break;
3084   default:
3085     LOG (GNUNET_ERROR_TYPE_WARNING,
3086          "GNUNET_STREAM_shutdown called with invalid value for "
3087          "parameter operation -- Ignoring\n");
3088     GNUNET_free (msg);
3089     GNUNET_free (handle);
3090     return NULL;
3091   }
3092   handle->close_msg_retransmission_task_id =
3093     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3094                                   &close_msg_retransmission_task,
3095                                   handle);
3096   return handle;
3097 }
3098
3099
3100 /**
3101  * Cancels a pending shutdown
3102  *
3103  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3104  */
3105 void
3106 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3107 {
3108   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3109     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3110   handle->socket->shutdown_handle = NULL;
3111   GNUNET_free (handle);
3112 }
3113
3114
3115 /**
3116  * Closes the stream
3117  *
3118  * @param socket the stream socket
3119  */
3120 void
3121 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3122 {
3123   struct MessageQueue *head;
3124
3125   if (NULL != socket->read_handle)
3126   {
3127     LOG (GNUNET_ERROR_TYPE_WARNING,
3128          "Closing STREAM socket when a read handle is pending\n");
3129     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3130   }
3131   if (NULL != socket->write_handle)
3132   {
3133     LOG (GNUNET_ERROR_TYPE_WARNING,
3134          "Closing STREAM socket when a write handle is pending\n");
3135     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3136     //socket->write_handle = NULL;
3137   }
3138   /* Terminate the ack'ing task if they are still present */
3139   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3140   {
3141     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3142     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3143   }
3144   /* Terminate the control retransmission tasks */
3145   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3146   {
3147     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3148   }
3149   /* Clear Transmit handles */
3150   if (NULL != socket->transmit_handle)
3151   {
3152     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3153     socket->transmit_handle = NULL;
3154   }
3155   /* Clear existing message queue */
3156   while (NULL != (head = socket->queue_head)) {
3157     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3158                                  socket->queue_tail,
3159                                  head);
3160     GNUNET_free (head->message);
3161     GNUNET_free (head);
3162   }
3163   /* Close associated tunnel */
3164   if (NULL != socket->tunnel)
3165   {
3166     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3167     socket->tunnel = NULL;
3168   }
3169   /* Close mesh connection */
3170   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3171   {
3172     GNUNET_MESH_disconnect (socket->mesh);
3173     socket->mesh = NULL;
3174   }
3175   /* Close statistics connection */
3176   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3177     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3178   /* Release receive buffer */
3179   if (NULL != socket->receive_buffer)
3180   {
3181     GNUNET_free (socket->receive_buffer);
3182   }
3183   GNUNET_free (socket);
3184 }
3185
3186
3187 /**
3188  * Listens for stream connections for a specific application ports
3189  *
3190  * @param cfg the configuration to use
3191  * @param app_port the application port for which new streams will be accepted
3192  * @param listen_cb this function will be called when a peer tries to establish
3193  *            a stream with us
3194  * @param listen_cb_cls closure for listen_cb
3195  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3196  * @return listen socket, NULL for any error
3197  */
3198 struct GNUNET_STREAM_ListenSocket *
3199 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3200                       GNUNET_MESH_ApplicationType app_port,
3201                       GNUNET_STREAM_ListenCallback listen_cb,
3202                       void *listen_cb_cls,
3203                       ...)
3204 {
3205   struct GNUNET_STREAM_ListenSocket *lsocket;
3206   struct GNUNET_TIME_Relative listen_timeout;
3207   enum GNUNET_STREAM_Option option;
3208   va_list vargs;
3209   uint16_t payload_size;
3210
3211   GNUNET_assert (NULL != listen_cb);
3212   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3213   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3214   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3215   if (NULL == lsocket->lockmanager)
3216   {
3217     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3218     GNUNET_free (lsocket);
3219     return NULL;
3220   }
3221   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3222   /* Set defaults */
3223   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3224   lsocket->testing_active = GNUNET_NO;
3225   lsocket->listen_ok_cb = NULL;
3226   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3227   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3228   va_start (vargs, listen_cb_cls);
3229   do {
3230     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3231     switch (option)
3232     {
3233     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3234       lsocket->retransmit_timeout = va_arg (vargs,
3235                                             struct GNUNET_TIME_Relative);
3236       break;
3237     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3238       lsocket->testing_active = GNUNET_YES;
3239       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3240                                                                  uint32_t);
3241       break;
3242     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3243       listen_timeout = GNUNET_TIME_relative_multiply
3244         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3245       break;
3246     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3247       lsocket->listen_ok_cb = va_arg (vargs,
3248                                       GNUNET_STREAM_ListenSuccessCallback);
3249       break;
3250     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3251       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3252       GNUNET_assert (0 != payload_size);
3253       if (payload_size < lsocket->max_payload_size)
3254         lsocket->max_payload_size = payload_size;
3255       break;
3256     case GNUNET_STREAM_OPTION_END:
3257       break;
3258     }
3259   } while (GNUNET_STREAM_OPTION_END != option);
3260   va_end (vargs);
3261   lsocket->port = app_port;
3262   lsocket->listen_cb = listen_cb;
3263   lsocket->listen_cb_cls = listen_cb_cls;
3264   lsocket->locking_request = 
3265     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3266                                      (uint32_t) lsocket->port,
3267                                      &lock_status_change_cb, lsocket);
3268   lsocket->lockmanager_acquire_timeout_task =
3269     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3270                                   &lockmanager_acquire_timeout, lsocket);
3271   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3272                                                    lsocket->cfg);
3273   return lsocket;
3274 }
3275
3276
3277 /**
3278  * Closes the listen socket
3279  *
3280  * @param lsocket the listen socket
3281  */
3282 void
3283 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3284 {
3285   /* Close MESH connection */
3286   if (NULL != lsocket->mesh)
3287     GNUNET_MESH_disconnect (lsocket->mesh);
3288   if (NULL != lsocket->stat_handle)
3289     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3290   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3291   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3292     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3293   if (NULL != lsocket->locking_request)
3294     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3295   if (NULL != lsocket->lockmanager)
3296     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3297   GNUNET_free (lsocket);
3298 }
3299
3300
3301 /**
3302  * Tries to write the given data to the stream. The maximum size of data that
3303  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3304  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3305  * violation, however only the said number of maximum bytes will be written.
3306  *
3307  * @param socket the socket representing a stream
3308  * @param data the data buffer from where the data is written into the stream
3309  * @param size the number of bytes to be written from the data buffer
3310  * @param timeout the timeout period
3311  * @param write_cont the function to call upon writing some bytes into the
3312  *          stream 
3313  * @param write_cont_cls the closure
3314  *
3315  * @return handle to cancel the operation; if a previous write is pending or
3316  *           the stream has been shutdown for this operation then write_cont is
3317  *           immediately called and NULL is returned.
3318  */
3319 struct GNUNET_STREAM_IOWriteHandle *
3320 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3321                      const void *data,
3322                      size_t size,
3323                      struct GNUNET_TIME_Relative timeout,
3324                      GNUNET_STREAM_CompletionContinuation write_cont,
3325                      void *write_cont_cls)
3326 {
3327   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3328   struct GNUNET_STREAM_DataMessage *data_msg;
3329   const void *sweep;
3330   struct GNUNET_TIME_Relative ack_deadline;
3331   unsigned int num_needed_packets;
3332   unsigned int packet;
3333   uint32_t packet_size;
3334   uint32_t payload_size;
3335   uint16_t max_data_packet_size;
3336
3337   LOG (GNUNET_ERROR_TYPE_DEBUG,
3338        "%s\n", __func__);
3339   if (NULL != socket->write_handle)
3340   {
3341     GNUNET_break (0);
3342     return NULL;
3343   }
3344   switch (socket->state)
3345   {
3346   case STATE_TRANSMIT_CLOSED:
3347   case STATE_TRANSMIT_CLOSE_WAIT:
3348   case STATE_CLOSED:
3349   case STATE_CLOSE_WAIT:
3350     if (NULL != write_cont)
3351       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3352     LOG (GNUNET_ERROR_TYPE_DEBUG,
3353          "%s() END\n", __func__);
3354     return NULL;
3355   case STATE_INIT:
3356   case STATE_LISTEN:
3357   case STATE_HELLO_WAIT:
3358     if (NULL != write_cont)
3359       /* FIXME: GNUNET_STREAM_SYSERR?? */
3360       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3361     LOG (GNUNET_ERROR_TYPE_DEBUG,
3362          "%s() END\n", __func__);
3363     return NULL;
3364   case STATE_ESTABLISHED:
3365   case STATE_RECEIVE_CLOSED:
3366   case STATE_RECEIVE_CLOSE_WAIT:
3367     break;
3368   }
3369   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3370     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3371   num_needed_packets =
3372       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3373   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3374   io_handle->socket = socket;
3375   io_handle->write_cont = write_cont;
3376   io_handle->write_cont_cls = write_cont_cls;
3377   io_handle->size = size;
3378   io_handle->packets_sent = 0;
3379   sweep = data;
3380   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3381      determined from RTT */
3382   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3383   /* Divide the given buffer into packets for sending */
3384   max_data_packet_size =
3385       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3386   for (packet=0; packet < num_needed_packets; packet++)
3387   {
3388     if ((packet + 1) * socket->max_payload_size < size) 
3389     {
3390       payload_size = socket->max_payload_size;
3391       packet_size = max_data_packet_size;
3392     }
3393     else 
3394     {
3395       payload_size = size - packet * socket->max_payload_size;
3396       packet_size = 
3397           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3398     }
3399     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3400     io_handle->messages[packet]->header.header.size = htons (packet_size);
3401     io_handle->messages[packet]->header.header.type =
3402       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3403     io_handle->messages[packet]->sequence_number =
3404       htonl (socket->write_sequence_number++);
3405     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3406     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3407        determined from RTT */
3408     io_handle->messages[packet]->ack_deadline =
3409       GNUNET_TIME_relative_hton (ack_deadline);
3410     data_msg = io_handle->messages[packet];
3411     /* Copy data from given buffer to the packet */
3412     memcpy (&data_msg[1], sweep, payload_size);
3413     sweep += payload_size;
3414     socket->write_offset += payload_size;
3415   }
3416   /* ack the last data message. FIXME: remove when we figure out how to do this
3417      using RTT */
3418   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3419       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3420   socket->write_handle = io_handle;
3421   write_data (socket);
3422   LOG (GNUNET_ERROR_TYPE_DEBUG,
3423        "%s() END\n", __func__);
3424   return io_handle;
3425 }
3426
3427
3428 /**
3429  * Tries to read data from the stream.
3430  *
3431  * @param socket the socket representing a stream
3432  * @param timeout the timeout period
3433  * @param proc function to call with data (once only)
3434  * @param proc_cls the closure for proc
3435  *
3436  * @return handle to cancel the operation; NULL is returned if: the stream has
3437  *           been shutdown for this type of opeartion (the DataProcessor is
3438  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3439  *           read handle is present (only one read handle per socket is present
3440  *           at any time)
3441  */
3442 struct GNUNET_STREAM_IOReadHandle *
3443 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3444                     struct GNUNET_TIME_Relative timeout,
3445                     GNUNET_STREAM_DataProcessor proc,
3446                     void *proc_cls)
3447 {
3448   struct GNUNET_STREAM_IOReadHandle *read_handle;
3449   
3450   LOG (GNUNET_ERROR_TYPE_DEBUG,
3451        "%s: %s()\n", 
3452        GNUNET_i2s (&socket->other_peer),
3453        __func__);
3454   /* Return NULL if there is already a read handle; the user has to cancel that
3455      first before continuing or has to wait until it is completed */
3456   if (NULL != socket->read_handle) 
3457     return NULL;
3458   GNUNET_assert (NULL != proc);
3459   switch (socket->state)
3460   {
3461   case STATE_RECEIVE_CLOSED:
3462   case STATE_RECEIVE_CLOSE_WAIT:
3463   case STATE_CLOSED:
3464   case STATE_CLOSE_WAIT:
3465     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3466     LOG (GNUNET_ERROR_TYPE_DEBUG,
3467          "%s: %s() END\n",
3468          GNUNET_i2s (&socket->other_peer),
3469          __func__);
3470     return NULL;
3471   default:
3472     break;
3473   }
3474   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3475   read_handle->proc = proc;
3476   read_handle->proc_cls = proc_cls;
3477   read_handle->socket = socket;
3478   socket->read_handle = read_handle;
3479   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3480                                           0))
3481     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3482                                                           socket);   
3483   read_handle->read_io_timeout_task_id =
3484       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3485   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3486        GNUNET_i2s (&socket->other_peer), __func__);
3487   return read_handle;
3488 }
3489
3490
3491 /**
3492  * Cancel pending write operation.
3493  *
3494  * @param ioh handle to operation to cancel
3495  */
3496 void
3497 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3498 {
3499   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3500   unsigned int packet;
3501
3502   GNUNET_assert (NULL != socket->write_handle);
3503   GNUNET_assert (socket->write_handle == ioh);
3504
3505   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3506   {
3507     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3508     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3509   }
3510
3511   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3512   {
3513     if (NULL == ioh->messages[packet]) break;
3514     GNUNET_free (ioh->messages[packet]);
3515   }
3516       
3517   GNUNET_free (socket->write_handle);
3518   socket->write_handle = NULL;
3519 }
3520
3521
3522 /**
3523  * Cancel pending read operation.
3524  *
3525  * @param ioh handle to operation to cancel
3526  */
3527 void
3528 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3529 {
3530   struct GNUNET_STREAM_Socket *socket;
3531   
3532   socket = ioh->socket;
3533   GNUNET_assert (NULL != socket->read_handle);
3534   GNUNET_assert (ioh == socket->read_handle);
3535   /* Read io time task should be there; if it is already executed then this
3536   read handle is not valid; However upon scheduler shutdown the read io task
3537   may be executed before */
3538   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3539     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3540   /* reading task may be present; if so we have to stop it */
3541   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3542     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3543   GNUNET_free (ioh);
3544   socket->read_handle = NULL;
3545 }
3546
3547 /* end of stream_api.c */