removed fixme
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * Retransmission timeout
179    */
180   struct GNUNET_TIME_Relative retransmit_timeout;
181
182   /**
183    * The Acknowledgement Bitmap
184    */
185   GNUNET_STREAM_AckBitmap ack_bitmap;
186
187   /**
188    * Time when the Acknowledgement was queued
189    */
190   struct GNUNET_TIME_Absolute ack_time_registered;
191
192   /**
193    * Queued Acknowledgement deadline
194    */
195   struct GNUNET_TIME_Relative ack_time_deadline;
196
197   /**
198    * The mesh handle
199    */
200   struct GNUNET_MESH_Handle *mesh;
201
202   /**
203    * The mesh tunnel handle
204    */
205   struct GNUNET_MESH_Tunnel *tunnel;
206
207   /**
208    * Stream open closure
209    */
210   void *open_cls;
211
212   /**
213    * Stream open callback
214    */
215   GNUNET_STREAM_OpenCallback open_cb;
216
217   /**
218    * The current transmit handle (if a pending transmit request exists)
219    */
220   struct GNUNET_MESH_TransmitHandle *transmit_handle;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for retransmission task after timeout
265    */
266   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
267
268   /**
269    * Task identifier for retransmission of control messages
270    */
271   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
272
273   /**
274    * The task for sending timely Acks
275    */
276   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
277
278   /**
279    * The state of the protocol associated with this socket
280    */
281   enum State state;
282
283   /**
284    * The status of the socket
285    */
286   enum GNUNET_STREAM_Status status;
287
288   /**
289    * The number of previous timeouts; FIXME: currently not used
290    */
291   unsigned int retries;
292
293   /**
294    * The application port number (type: uint32_t)
295    */
296   GNUNET_MESH_ApplicationType app_port;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * The write sequence number to be set incase of testing
305    */
306   uint32_t testing_set_write_sequence_number_value;
307
308   /**
309    * Write sequence number. Set to random when sending HELLO(client) and
310    * HELLO_ACK(server) 
311    */
312   uint32_t write_sequence_number;
313
314   /**
315    * Read sequence number. This number's value is determined during handshake
316    */
317   uint32_t read_sequence_number;
318
319   /**
320    * The receiver buffer size
321    */
322   uint32_t receive_buffer_size;
323
324   /**
325    * The receiver buffer boundaries
326    */
327   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
328
329   /**
330    * receiver's available buffer after the last acknowledged packet
331    */
332   uint32_t receiver_window_available;
333
334   /**
335    * The offset pointer used during write operation
336    */
337   uint32_t write_offset;
338
339   /**
340    * The offset after which we are expecting data
341    */
342   uint32_t read_offset;
343
344   /**
345    * The offset upto which user has read from the received buffer
346    */
347   uint32_t copy_offset;
348
349   /**
350    * The maximum size of the data message payload this stream handle can send
351    */
352   uint16_t max_payload_size;
353 };
354
355
356 /**
357  * A socket for listening
358  */
359 struct GNUNET_STREAM_ListenSocket
360 {
361   /**
362    * The mesh handle
363    */
364   struct GNUNET_MESH_Handle *mesh;
365
366   /**
367    * Our configuration
368    */
369   struct GNUNET_CONFIGURATION_Handle *cfg;
370
371   /**
372    * Handle to the lock manager service
373    */
374   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
375
376   /**
377    * The active LockingRequest from lockmanager
378    */
379   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
380
381   /**
382    * Callback to call after acquring a lock and listening
383    */
384   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
385
386   /**
387    * The callback function which is called after successful opening socket
388    */
389   GNUNET_STREAM_ListenCallback listen_cb;
390
391   /**
392    * The call back closure
393    */
394   void *listen_cb_cls;
395
396   /**
397    * The service port
398    */
399   GNUNET_MESH_ApplicationType port;
400   
401   /**
402    * The id of the lockmanager timeout task
403    */
404   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
405
406   /**
407    * The retransmit timeout
408    */
409   struct GNUNET_TIME_Relative retransmit_timeout;
410   
411   /**
412    * Listen enabled?
413    */
414   int listening;
415
416   /**
417    * Whether testing mode is active or not
418    */
419   int testing_active;
420
421   /**
422    * The write sequence number to be set incase of testing
423    */
424   uint32_t testing_set_write_sequence_number_value;
425
426   /**
427    * The maximum size of the data message payload this stream handle can send
428    */
429   uint16_t max_payload_size;
430
431 };
432
433
434 /**
435  * The IO Write Handle
436  */
437 struct GNUNET_STREAM_IOWriteHandle
438 {
439   /**
440    * The socket to which this write handle is associated
441    */
442   struct GNUNET_STREAM_Socket *socket;
443
444   /**
445    * The packet_buffers associated with this Handle
446    */
447   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
448
449   /**
450    * The write continuation callback
451    */
452   GNUNET_STREAM_CompletionContinuation write_cont;
453
454   /**
455    * Write continuation closure
456    */
457   void *write_cont_cls;
458
459   /**
460    * The bitmap of this IOHandle; Corresponding bit for a message is set when
461    * it has been acknowledged by the receiver
462    */
463   GNUNET_STREAM_AckBitmap ack_bitmap;
464
465   /**
466    * Number of bytes in this write handle
467    */
468   size_t size;
469
470   /**
471    * Number of packets already transmitted from this IO handle. Retransmitted
472    * packets are not taken into account here. This is used to determine which
473    * packets account for retransmission and which packets occupy buffer space at
474    * the receiver.
475    */
476   unsigned int packets_sent;
477 };
478
479
480 /**
481  * The IO Read Handle
482  */
483 struct GNUNET_STREAM_IOReadHandle
484 {
485   /**
486    * The socket to which this read handle is associated
487    */
488   struct GNUNET_STREAM_Socket *socket;
489   
490   /**
491    * Callback for the read processor
492    */
493   GNUNET_STREAM_DataProcessor proc;
494
495   /**
496    * The closure pointer for the read processor callback
497    */
498   void *proc_cls;
499
500   /**
501    * Task identifier for the read io timeout task
502    */
503   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
504
505   /**
506    * Task scheduled to continue a read operation.
507    */
508   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
509 };
510
511
512 /**
513  * Handle for Shutdown
514  */
515 struct GNUNET_STREAM_ShutdownHandle
516 {
517   /**
518    * The socket associated with this shutdown handle
519    */
520   struct GNUNET_STREAM_Socket *socket;
521
522   /**
523    * Shutdown completion callback
524    */
525   GNUNET_STREAM_ShutdownCompletion completion_cb;
526
527   /**
528    * Closure for completion callback
529    */
530   void *completion_cls;
531
532   /**
533    * Close message retransmission task id
534    */
535   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
536
537   /**
538    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
539    */
540   int operation;  
541 };
542
543
544 /**
545  * Default value in seconds for various timeouts
546  */
547 static const unsigned int default_timeout = 10;
548
549 /**
550  * The domain name for locks we use here
551  */
552 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
553
554
555 /**
556  * Callback function for sending queued message
557  *
558  * @param cls closure the socket
559  * @param size number of bytes available in buf
560  * @param buf where the callee should write the message
561  * @return number of bytes written to buf
562  */
563 static size_t
564 send_message_notify (void *cls, size_t size, void *buf)
565 {
566   struct GNUNET_STREAM_Socket *socket = cls;
567   struct MessageQueue *head;
568   size_t ret;
569
570   socket->transmit_handle = NULL; /* Remove the transmit handle */
571   head = socket->queue_head;
572   if (NULL == head)
573     return 0; /* just to be safe */
574   if (0 == size)                /* request timed out */
575   {
576     socket->retries++;
577     LOG (GNUNET_ERROR_TYPE_DEBUG,
578          "%s: Message sending timed out. Retry %d \n",
579          GNUNET_i2s (&socket->other_peer),
580          socket->retries);
581     socket->transmit_handle = 
582       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
583                                          GNUNET_NO, /* Corking */
584                                          /* FIXME: exponential backoff */
585                                          socket->retransmit_timeout,
586                                          &socket->other_peer,
587                                          ntohs (head->message->header.size),
588                                          &send_message_notify,
589                                          socket);
590     return 0;
591   }
592   ret = ntohs (head->message->header.size);
593   GNUNET_assert (size >= ret);
594   memcpy (buf, head->message, ret);
595   if (NULL != head->finish_cb)
596   {
597     head->finish_cb (head->finish_cb_cls, socket);
598   }
599   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
600                                socket->queue_tail,
601                                head);
602   GNUNET_free (head->message);
603   GNUNET_free (head);
604   head = socket->queue_head;
605   if (NULL != head)    /* more pending messages to send */
606   {
607     socket->retries = 0;
608     socket->transmit_handle = 
609       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
610                                          GNUNET_NO, /* Corking */
611                                          /* FIXME: exponential backoff */
612                                          socket->retransmit_timeout,
613                                          &socket->other_peer,
614                                          ntohs (head->message->header.size),
615                                          &send_message_notify,
616                                          socket);
617   }
618   return ret;
619 }
620
621
622 /**
623  * Queues a message for sending using the mesh connection of a socket
624  *
625  * @param socket the socket whose mesh connection is used
626  * @param message the message to be sent
627  * @param finish_cb the callback to be called when the message is sent
628  * @param finish_cb_cls the closure for the callback
629  * @param urgent set to GNUNET_YES to add the message to the beginning of the
630  *          queue; GNUNET_NO to add at the tail
631  */
632 static void
633 queue_message (struct GNUNET_STREAM_Socket *socket,
634                struct GNUNET_STREAM_MessageHeader *message,
635                SendFinishCallback finish_cb,
636                void *finish_cb_cls,
637                int urgent)
638 {
639   struct MessageQueue *queue_entity;
640
641   GNUNET_assert 
642     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
643      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
644   LOG (GNUNET_ERROR_TYPE_DEBUG,
645        "%s: Queueing message of type %d and size %d\n",
646        GNUNET_i2s (&socket->other_peer),
647        ntohs (message->header.type),
648        ntohs (message->header.size));
649   GNUNET_assert (NULL != message);
650   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
651   queue_entity->message = message;
652   queue_entity->finish_cb = finish_cb;
653   queue_entity->finish_cb_cls = finish_cb_cls;
654   if (GNUNET_YES == urgent)
655   {
656     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
657                                  queue_entity);
658     if (NULL != socket->transmit_handle)
659     {
660       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
661       socket->transmit_handle = NULL;
662     }
663   }
664   else
665     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
666                                       socket->queue_tail,
667                                       queue_entity);
668   if (NULL == socket->transmit_handle)
669   {
670     socket->retries = 0;
671     socket->transmit_handle = 
672         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
673                                            GNUNET_NO, /* Corking */
674                                            socket->retransmit_timeout,
675                                            &socket->other_peer,
676                                            ntohs (message->header.size),
677                                            &send_message_notify,
678                                            socket);
679   }
680 }
681
682
683 /**
684  * Copies a message and queues it for sending using the mesh connection of
685  * given socket 
686  *
687  * @param socket the socket whose mesh connection is used
688  * @param message the message to be sent
689  * @param finish_cb the callback to be called when the message is sent
690  * @param finish_cb_cls the closure for the callback
691  */
692 static void
693 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
694                         const struct GNUNET_STREAM_MessageHeader *message,
695                         SendFinishCallback finish_cb,
696                         void *finish_cb_cls)
697 {
698   struct GNUNET_STREAM_MessageHeader *msg_copy;
699   uint16_t size;
700   
701   size = ntohs (message->header.size);
702   msg_copy = GNUNET_malloc (size);
703   memcpy (msg_copy, message, size);
704   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
705 }
706
707
708 /**
709  * Writes data using the given socket. The amount of data written is limited by
710  * the receiver_window_size
711  *
712  * @param socket the socket to use
713  */
714 static void 
715 write_data (struct GNUNET_STREAM_Socket *socket);
716
717
718 /**
719  * Task for retransmitting data messages if they aren't ACK before their ack
720  * deadline 
721  *
722  * @param cls the socket
723  * @param tc the Task context
724  */
725 static void
726 data_retransmission_task (void *cls,
727                           const struct GNUNET_SCHEDULER_TaskContext *tc)
728 {
729   struct GNUNET_STREAM_Socket *socket = cls;
730   
731   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
732   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
733     return;
734   LOG (GNUNET_ERROR_TYPE_DEBUG,
735        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
736   write_data (socket);
737 }
738
739
740 /**
741  * Task for sending ACK message
742  *
743  * @param cls the socket
744  * @param tc the Task context
745  */
746 static void
747 ack_task (void *cls,
748           const struct GNUNET_SCHEDULER_TaskContext *tc)
749 {
750   struct GNUNET_STREAM_Socket *socket = cls;
751   struct GNUNET_STREAM_AckMessage *ack_msg;
752
753   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
754   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
755     return;
756   /* Create the ACK Message */
757   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
758   ack_msg->header.header.size = htons (sizeof (struct 
759                                                GNUNET_STREAM_AckMessage));
760   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
761   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
762   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
763   ack_msg->receive_window_remaining = 
764     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
765   /* Queue up ACK for immediate sending */
766   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
767 }
768
769
770 /**
771  * Retransmission task for shutdown messages
772  *
773  * @param cls the shutdown handle
774  * @param tc the Task Context
775  */
776 static void
777 close_msg_retransmission_task (void *cls,
778                                const struct GNUNET_SCHEDULER_TaskContext *tc)
779 {
780   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
781   struct GNUNET_STREAM_MessageHeader *msg;
782   struct GNUNET_STREAM_Socket *socket;
783
784   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
785   GNUNET_assert (NULL != shutdown_handle);
786   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
787     return;
788   socket = shutdown_handle->socket;
789   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
790   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
791   switch (shutdown_handle->operation)
792   {
793   case SHUT_RDWR:
794     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
795     break;
796   case SHUT_RD:
797     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
798     break;
799   case SHUT_WR:
800     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
801     break;
802   default:
803     GNUNET_free (msg);
804     shutdown_handle->close_msg_retransmission_task_id = 
805       GNUNET_SCHEDULER_NO_TASK;
806     return;
807   }
808   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
809   shutdown_handle->close_msg_retransmission_task_id =
810     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
811                                   &close_msg_retransmission_task,
812                                   shutdown_handle);
813 }
814
815
816 /**
817  * Function to modify a bit in GNUNET_STREAM_AckBitmap
818  *
819  * @param bitmap the bitmap to modify
820  * @param bit the bit number to modify
821  * @param value GNUNET_YES to on, GNUNET_NO to off
822  */
823 static void
824 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
825                       unsigned int bit, 
826                       int value)
827 {
828   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
829   if (GNUNET_YES == value)
830     *bitmap |= (1LL << bit);
831   else
832     *bitmap &= ~(1LL << bit);
833 }
834
835
836 /**
837  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
838  *
839  * @param bitmap address of the bitmap that has to be checked
840  * @param bit the bit number to check
841  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
842  */
843 static uint8_t
844 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
845                       unsigned int bit)
846 {
847   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
848   return 0 != (*bitmap & (1LL << bit));
849 }
850
851
852 /**
853  * Writes data using the given socket. The amount of data written is limited by
854  * the receiver_window_size
855  *
856  * @param socket the socket to use
857  */
858 static void 
859 write_data (struct GNUNET_STREAM_Socket *socket)
860 {
861   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
862   unsigned int packet;
863   
864   for (packet=0; packet < io_handle->packets_sent; packet++)
865   {
866     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
867                                            packet))
868     {
869       LOG (GNUNET_ERROR_TYPE_DEBUG,
870            "%s: Retransmitting DATA message with sequence %u\n",
871            GNUNET_i2s (&socket->other_peer),
872            ntohl (io_handle->messages[packet]->sequence_number));
873       copy_and_queue_message (socket,
874                               &io_handle->messages[packet]->header,
875                               NULL,
876                               NULL);
877     }
878   }
879   /* Now send new packets if there is enough buffer space */
880   while ( (NULL != io_handle->messages[packet]) &&
881           (socket->receiver_window_available 
882            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
883           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
884   {
885     socket->receiver_window_available -= 
886       ntohs (io_handle->messages[packet]->header.header.size);
887     LOG (GNUNET_ERROR_TYPE_DEBUG,
888          "%s: Placing DATA message with sequence %u in send queue\n",
889          GNUNET_i2s (&socket->other_peer),
890          ntohl (io_handle->messages[packet]->sequence_number));
891     copy_and_queue_message (socket,
892                             &io_handle->messages[packet]->header,
893                             NULL,
894                             NULL);
895     packet++;
896   }
897   io_handle->packets_sent = packet;
898   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
899     socket->data_retransmission_task_id = 
900       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
901                                     (GNUNET_TIME_UNIT_SECONDS, 8),
902                                     &data_retransmission_task,
903                                     socket);
904 }
905
906
907 /**
908  * Task for calling the read processor
909  *
910  * @param cls the socket
911  * @param tc the task context
912  */
913 static void
914 call_read_processor (void *cls,
915                      const struct GNUNET_SCHEDULER_TaskContext *tc)
916 {
917   struct GNUNET_STREAM_Socket *socket = cls;
918   struct GNUNET_STREAM_IOReadHandle *read_handle;
919   size_t read_size;
920   size_t valid_read_size;
921   unsigned int packet;
922   uint32_t sequence_increase;
923   uint32_t offset_increase;
924
925   read_handle = socket->read_handle;
926   GNUNET_assert (NULL != read_handle);
927   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
928   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
929     return;
930   if (NULL == socket->receive_buffer) 
931     return;
932   GNUNET_assert (NULL != socket->read_handle);
933   GNUNET_assert (NULL != socket->read_handle->proc);
934   /* Check the bitmap for any holes */
935   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
936   {
937     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
938                                            packet))
939       break;
940   }
941   /* We only call read processor if we have the first packet */
942   GNUNET_assert (0 < packet);
943   valid_read_size = 
944     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
945   GNUNET_assert (0 != valid_read_size);
946   /* Cancel the read_io_timeout_task */
947   GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
948   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
949   /* Call the data processor */
950   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
951        GNUNET_i2s (&socket->other_peer));
952   read_size = 
953       socket->read_handle->proc (socket->read_handle->proc_cls,
954                                  socket->status,
955                                  socket->receive_buffer + socket->copy_offset,
956                                  valid_read_size);
957   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
958        GNUNET_i2s (&socket->other_peer), read_size);
959   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
960        GNUNET_i2s (&socket->other_peer));
961   /* Free the read handle */
962   GNUNET_free (socket->read_handle);
963   socket->read_handle = NULL;
964   GNUNET_assert (read_size <= valid_read_size);
965   socket->copy_offset += read_size;
966   /* Determine upto which packet we can remove from the buffer */
967   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
968   {
969     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
970     { packet++; break; }
971     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
972       break;
973   }
974   /* If no packets can be removed we can't move the buffer */
975   if (0 == packet) 
976     return;
977   sequence_increase = packet;
978   LOG (GNUNET_ERROR_TYPE_DEBUG,
979        "%s: Sequence increase after read processor completion: %u\n",
980        GNUNET_i2s (&socket->other_peer), sequence_increase);
981   /* Shift the data in the receive buffer */
982   socket->receive_buffer = 
983     memmove (socket->receive_buffer,
984              socket->receive_buffer 
985              + socket->receive_buffer_boundaries[sequence_increase-1],
986              socket->receive_buffer_size
987              - socket->receive_buffer_boundaries[sequence_increase-1]);
988   /* Shift the bitmap */
989   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
990   /* Set read_sequence_number */
991   socket->read_sequence_number += sequence_increase;
992   /* Set read_offset */
993   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
994   socket->read_offset += offset_increase;
995   /* Fix copy_offset */
996   GNUNET_assert (offset_increase <= socket->copy_offset);
997   socket->copy_offset -= offset_increase;
998   /* Fix relative boundaries */
999   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1000   {
1001     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1002     {
1003       uint32_t ahead_buffer_boundary;
1004
1005       ahead_buffer_boundary = 
1006         socket->receive_buffer_boundaries[packet + sequence_increase];
1007       if (0 == ahead_buffer_boundary)
1008         socket->receive_buffer_boundaries[packet] = 0;
1009       else
1010       {
1011         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1012         socket->receive_buffer_boundaries[packet] = 
1013           ahead_buffer_boundary - offset_increase;
1014       }
1015     }
1016     else
1017       socket->receive_buffer_boundaries[packet] = 0;
1018   }
1019 }
1020
1021
1022 /**
1023  * Cancels the existing read io handle
1024  *
1025  * @param cls the closure from the SCHEDULER call
1026  * @param tc the task context
1027  */
1028 static void
1029 read_io_timeout (void *cls,
1030                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1031 {
1032   struct GNUNET_STREAM_Socket *socket = cls;
1033   struct GNUNET_STREAM_IOReadHandle *read_handle;
1034   GNUNET_STREAM_DataProcessor proc;
1035   void *proc_cls;
1036
1037   read_handle = socket->read_handle;
1038   GNUNET_assert (NULL != read_handle);
1039   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1040   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1041     return;
1042   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1043   {
1044     LOG (GNUNET_ERROR_TYPE_DEBUG,
1045          "%s: Read task timedout - Cancelling it\n",
1046          GNUNET_i2s (&socket->other_peer));
1047     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1048     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1049   }
1050   proc = read_handle->proc;
1051   proc_cls = read_handle->proc_cls;
1052   GNUNET_free (read_handle);
1053   socket->read_handle = NULL;
1054   /* Call the read processor to signal timeout */
1055   proc (proc_cls,
1056         GNUNET_STREAM_TIMEOUT,
1057         NULL,
1058         0);
1059 }
1060
1061
1062 /**
1063  * Handler for DATA messages; Same for both client and server
1064  *
1065  * @param socket the socket through which the ack was received
1066  * @param tunnel connection to the other end
1067  * @param sender who sent the message
1068  * @param msg the data message
1069  * @param atsi performance data for the connection
1070  * @return GNUNET_OK to keep the connection open,
1071  *         GNUNET_SYSERR to close it (signal serious error)
1072  */
1073 static int
1074 handle_data (struct GNUNET_STREAM_Socket *socket,
1075              struct GNUNET_MESH_Tunnel *tunnel,
1076              const struct GNUNET_PeerIdentity *sender,
1077              const struct GNUNET_STREAM_DataMessage *msg,
1078              const struct GNUNET_ATS_Information*atsi)
1079 {
1080   const void *payload;
1081   struct GNUNET_TIME_Relative ack_deadline_rel;
1082   uint32_t bytes_needed;
1083   uint32_t relative_offset;
1084   uint32_t relative_sequence_number;
1085   uint16_t size;
1086
1087   size = htons (msg->header.header.size);
1088   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1089   {
1090     GNUNET_break_op (0);
1091     return GNUNET_SYSERR;
1092   }
1093   if (0 != memcmp (sender, &socket->other_peer,
1094                    sizeof (struct GNUNET_PeerIdentity)))
1095   {
1096     LOG (GNUNET_ERROR_TYPE_DEBUG,
1097          "%s: Received DATA from non-confirming peer\n",
1098          GNUNET_i2s (&socket->other_peer));
1099     return GNUNET_YES;
1100   }
1101   switch (socket->state)
1102   {
1103   case STATE_ESTABLISHED:
1104   case STATE_TRANSMIT_CLOSED:
1105   case STATE_TRANSMIT_CLOSE_WAIT:      
1106     /* check if the message's sequence number is in the range we are
1107        expecting */
1108     relative_sequence_number = 
1109       ntohl (msg->sequence_number) - socket->read_sequence_number;
1110     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1111     {
1112       LOG (GNUNET_ERROR_TYPE_DEBUG,
1113            "%s: Ignoring received message with sequence number %u\n",
1114            GNUNET_i2s (&socket->other_peer),
1115            ntohl (msg->sequence_number));
1116       /* Start ACK sending task if one is not already present */
1117       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1118       {
1119         socket->ack_task_id = 
1120           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1121                                         (msg->ack_deadline),
1122                                         &ack_task,
1123                                         socket);
1124       }
1125       return GNUNET_YES;
1126     }      
1127     /* Check if we have already seen this message */
1128     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1129                                             relative_sequence_number))
1130     {
1131       LOG (GNUNET_ERROR_TYPE_DEBUG,
1132            "%s: Ignoring already received message with sequence number %u\n",
1133            GNUNET_i2s (&socket->other_peer),
1134            ntohl (msg->sequence_number));
1135       /* Start ACK sending task if one is not already present */
1136       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1137       {
1138         socket->ack_task_id = 
1139           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1140                                         (msg->ack_deadline), &ack_task, socket);
1141       }
1142       return GNUNET_YES;
1143     }
1144     LOG (GNUNET_ERROR_TYPE_DEBUG,
1145          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1146          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1147          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1148     /* Check if we have to allocate the buffer */
1149     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1150     relative_offset = ntohl (msg->offset) - socket->read_offset;
1151     bytes_needed = relative_offset + size;
1152     if (bytes_needed > socket->receive_buffer_size)
1153     {
1154       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1155       {
1156         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1157                                                  bytes_needed);
1158         socket->receive_buffer_size = bytes_needed;
1159       }
1160       else
1161       {
1162         LOG (GNUNET_ERROR_TYPE_DEBUG,
1163              "%s: Cannot accommodate packet %d as buffer is full\n",
1164              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1165         return GNUNET_YES;
1166       }
1167     }
1168     /* Copy Data to buffer */
1169     payload = &msg[1];
1170     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1171     memcpy (socket->receive_buffer + relative_offset, payload, size);
1172     socket->receive_buffer_boundaries[relative_sequence_number] = 
1173         relative_offset + size;
1174     /* Modify the ACK bitmap */
1175     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1176                           GNUNET_YES);
1177     /* Start ACK sending task if one is not already present */
1178     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1179     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1180     {
1181       ack_deadline_rel = 
1182           GNUNET_TIME_relative_min (ack_deadline_rel,
1183                                     GNUNET_TIME_relative_multiply
1184                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1185       socket->ack_task_id = 
1186           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1187                                         (msg->ack_deadline), &ack_task, socket);
1188       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1189       socket->ack_time_deadline = ack_deadline_rel;
1190     }
1191     else
1192     {
1193       struct GNUNET_TIME_Relative ack_time_past;
1194       struct GNUNET_TIME_Relative ack_time_remaining;
1195       struct GNUNET_TIME_Relative ack_time_min;
1196       ack_time_past = 
1197           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1198       ack_time_remaining = GNUNET_TIME_relative_subtract
1199           (socket->ack_time_deadline, ack_time_past);
1200       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1201                                                ack_deadline_rel);
1202       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1203                       sizeof (struct GNUNET_TIME_Relative)))
1204       {
1205         ack_deadline_rel = ack_time_min;
1206         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1207         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1208                                                             &ack_task, socket);
1209         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1210         socket->ack_time_deadline = ack_deadline_rel;
1211       }
1212     }
1213     if ((NULL != socket->read_handle) /* A read handle is waiting */
1214         /* There is no current read task */
1215         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1216         /* We have the first packet */
1217         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1218     {
1219       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1220            GNUNET_i2s (&socket->other_peer));
1221       socket->read_handle->read_task_id =
1222           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1223     }
1224     break;
1225   default:
1226     LOG (GNUNET_ERROR_TYPE_DEBUG,
1227          "%s: Received data message when it cannot be handled\n",
1228          GNUNET_i2s (&socket->other_peer));
1229     break;
1230   }
1231   return GNUNET_YES;
1232 }
1233
1234
1235 /**
1236  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1237  *
1238  * @param cls the socket (set from GNUNET_MESH_connect)
1239  * @param tunnel connection to the other end
1240  * @param tunnel_ctx place to store local state associated with the tunnel
1241  * @param sender who sent the message
1242  * @param message the actual message
1243  * @param atsi performance data for the connection
1244  * @return GNUNET_OK to keep the connection open,
1245  *         GNUNET_SYSERR to close it (signal serious error)
1246  */
1247 static int
1248 client_handle_data (void *cls,
1249                     struct GNUNET_MESH_Tunnel *tunnel,
1250                     void **tunnel_ctx,
1251                     const struct GNUNET_PeerIdentity *sender,
1252                     const struct GNUNET_MessageHeader *message,
1253                     const struct GNUNET_ATS_Information*atsi)
1254 {
1255   struct GNUNET_STREAM_Socket *socket = cls;
1256
1257   return handle_data (socket, tunnel, sender, 
1258                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1259 }
1260
1261
1262 /**
1263  * Callback to set state to ESTABLISHED
1264  *
1265  * @param cls the closure NULL;
1266  * @param socket the socket to requiring state change
1267  */
1268 static void
1269 set_state_established (void *cls,
1270                        struct GNUNET_STREAM_Socket *socket)
1271 {
1272   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1273        "%s: Attaining ESTABLISHED state\n",
1274        GNUNET_i2s (&socket->other_peer));
1275   socket->write_offset = 0;
1276   socket->read_offset = 0;
1277   socket->state = STATE_ESTABLISHED;
1278   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1279                  socket->control_retransmission_task_id);
1280   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1281   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1282   if (NULL != socket->lsocket)
1283   {
1284     LOG (GNUNET_ERROR_TYPE_DEBUG,
1285          "%s: Calling listen callback\n",
1286          GNUNET_i2s (&socket->other_peer));
1287     if (GNUNET_SYSERR == 
1288         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1289                                     socket,
1290                                     &socket->other_peer))
1291     {
1292       socket->state = STATE_CLOSED;
1293       /* FIXME: We should close in a decent way (send RST) */
1294       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1295       GNUNET_free (socket);
1296     }
1297   }
1298   else
1299     socket->open_cb (socket->open_cls, socket);
1300 }
1301
1302
1303 /**
1304  * Callback to set state to HELLO_WAIT
1305  *
1306  * @param cls the closure from queue_message
1307  * @param socket the socket to requiring state change
1308  */
1309 static void
1310 set_state_hello_wait (void *cls,
1311                       struct GNUNET_STREAM_Socket *socket)
1312 {
1313   GNUNET_assert (STATE_INIT == socket->state);
1314   LOG (GNUNET_ERROR_TYPE_DEBUG,
1315        "%s: Attaining HELLO_WAIT state\n",
1316        GNUNET_i2s (&socket->other_peer));
1317   socket->state = STATE_HELLO_WAIT;
1318 }
1319
1320
1321 /**
1322  * Callback to set state to CLOSE_WAIT
1323  *
1324  * @param cls the closure from queue_message
1325  * @param socket the socket requiring state change
1326  */
1327 static void
1328 set_state_close_wait (void *cls,
1329                       struct GNUNET_STREAM_Socket *socket)
1330 {
1331   LOG (GNUNET_ERROR_TYPE_DEBUG,
1332        "%s: Attaing CLOSE_WAIT state\n",
1333        GNUNET_i2s (&socket->other_peer));
1334   socket->state = STATE_CLOSE_WAIT;
1335   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1336   socket->receive_buffer = NULL;
1337   socket->receive_buffer_size = 0;
1338 }
1339
1340
1341 /**
1342  * Callback to set state to RECEIVE_CLOSE_WAIT
1343  *
1344  * @param cls the closure from queue_message
1345  * @param socket the socket requiring state change
1346  */
1347 static void
1348 set_state_receive_close_wait (void *cls,
1349                               struct GNUNET_STREAM_Socket *socket)
1350 {
1351   LOG (GNUNET_ERROR_TYPE_DEBUG,
1352        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1353        GNUNET_i2s (&socket->other_peer));
1354   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1355   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1356   socket->receive_buffer = NULL;
1357   socket->receive_buffer_size = 0;
1358 }
1359
1360
1361 /**
1362  * Callback to set state to TRANSMIT_CLOSE_WAIT
1363  *
1364  * @param cls the closure from queue_message
1365  * @param socket the socket requiring state change
1366  */
1367 static void
1368 set_state_transmit_close_wait (void *cls,
1369                                struct GNUNET_STREAM_Socket *socket)
1370 {
1371   LOG (GNUNET_ERROR_TYPE_DEBUG,
1372        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1373        GNUNET_i2s (&socket->other_peer));
1374   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1375 }
1376
1377
1378 /**
1379  * Callback to set state to CLOSED
1380  *
1381  * @param cls the closure from queue_message
1382  * @param socket the socket requiring state change
1383  */
1384 static void
1385 set_state_closed (void *cls,
1386                   struct GNUNET_STREAM_Socket *socket)
1387 {
1388   socket->state = STATE_CLOSED;
1389 }
1390
1391
1392 /**
1393  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1394  *
1395  * @return the generate hello message
1396  */
1397 static struct GNUNET_STREAM_MessageHeader *
1398 generate_hello (void)
1399 {
1400   struct GNUNET_STREAM_MessageHeader *msg;
1401
1402   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1403   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1404   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1405   return msg;
1406 }
1407
1408
1409 /**
1410  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1411  * socket
1412  *
1413  * @param socket the socket for which this HelloAckMessage has to be generated
1414  * @param generate_seq GNUNET_YES to generate the write sequence number,
1415  *          GNUNET_NO to use the existing sequence number
1416  * @return the HelloAckMessage
1417  */
1418 static struct GNUNET_STREAM_HelloAckMessage *
1419 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1420                     int generate_seq)
1421 {
1422   struct GNUNET_STREAM_HelloAckMessage *msg;
1423
1424   if (GNUNET_YES == generate_seq)
1425   {
1426     if (GNUNET_YES == socket->testing_active)
1427       socket->write_sequence_number =
1428         socket->testing_set_write_sequence_number_value;
1429     else
1430       socket->write_sequence_number = 
1431         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1432     LOG_DEBUG ("%s: write sequence number %u\n",
1433                GNUNET_i2s (&socket->other_peer),
1434                (unsigned int) socket->write_sequence_number);
1435   }
1436   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1437   msg->header.header.size = 
1438     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1439   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1440   msg->sequence_number = htonl (socket->write_sequence_number);
1441   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1442   return msg;
1443 }
1444
1445
1446 /**
1447  * Task for retransmitting control messages if they aren't ACK'ed before a
1448  * deadline
1449  *
1450  * @param cls the socket
1451  * @param tc the Task context
1452  */
1453 static void
1454 control_retransmission_task (void *cls,
1455                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1456 {
1457   struct GNUNET_STREAM_Socket *socket = cls;
1458     
1459   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1460   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1461     return;
1462   LOG_DEBUG ("%s: Retransmitting a control message\n",
1463                  GNUNET_i2s (&socket->other_peer));
1464   switch (socket->state)
1465   {
1466   case STATE_INIT:    
1467     GNUNET_break (0);
1468     break;
1469   case STATE_LISTEN:
1470     GNUNET_break (0);
1471     break;
1472   case STATE_HELLO_WAIT:
1473     if (NULL == socket->lsocket) /* We are client */
1474       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1475     else
1476       queue_message (socket,
1477                      (struct GNUNET_STREAM_MessageHeader *)
1478                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1479                      GNUNET_NO);
1480     socket->control_retransmission_task_id =
1481     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1482                                   &control_retransmission_task, socket);
1483     break;
1484   case STATE_ESTABLISHED:
1485     if (NULL == socket->lsocket)
1486       queue_message (socket,
1487                      (struct GNUNET_STREAM_MessageHeader *)
1488                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1489                      GNUNET_NO);
1490     else
1491       GNUNET_break (0);
1492   default:
1493     GNUNET_break (0);
1494   }  
1495 }
1496
1497
1498 /**
1499  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1500  *
1501  * @param cls the socket (set from GNUNET_MESH_connect)
1502  * @param tunnel connection to the other end
1503  * @param tunnel_ctx this is NULL
1504  * @param sender who sent the message
1505  * @param message the actual message
1506  * @param atsi performance data for the connection
1507  * @return GNUNET_OK to keep the connection open,
1508  *         GNUNET_SYSERR to close it (signal serious error)
1509  */
1510 static int
1511 client_handle_hello_ack (void *cls,
1512                          struct GNUNET_MESH_Tunnel *tunnel,
1513                          void **tunnel_ctx,
1514                          const struct GNUNET_PeerIdentity *sender,
1515                          const struct GNUNET_MessageHeader *message,
1516                          const struct GNUNET_ATS_Information*atsi)
1517 {
1518   struct GNUNET_STREAM_Socket *socket = cls;
1519   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1520   struct GNUNET_STREAM_HelloAckMessage *reply;
1521
1522   if (0 != memcmp (sender, &socket->other_peer,
1523                    sizeof (struct GNUNET_PeerIdentity)))
1524   {
1525     LOG (GNUNET_ERROR_TYPE_DEBUG,
1526          "%s: Received HELLO_ACK from non-confirming peer\n",
1527          GNUNET_i2s (&socket->other_peer));
1528     return GNUNET_YES;
1529   }
1530   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1531   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1532        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1533   GNUNET_assert (socket->tunnel == tunnel);
1534   switch (socket->state)
1535   {
1536   case STATE_HELLO_WAIT:
1537     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1538     LOG (GNUNET_ERROR_TYPE_DEBUG,
1539          "%s: Read sequence number %u\n",
1540          GNUNET_i2s (&socket->other_peer),
1541          (unsigned int) socket->read_sequence_number);
1542     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1543     reply = generate_hello_ack (socket, GNUNET_YES);
1544     queue_message (socket, &reply->header, &set_state_established,
1545                    NULL, GNUNET_NO);    
1546     return GNUNET_OK;
1547   case STATE_ESTABLISHED:
1548     // call statistics (# ACKs ignored++)
1549     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1550                    socket->control_retransmission_task_id);
1551     socket->control_retransmission_task_id =
1552       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1553     return GNUNET_OK;
1554   default:
1555     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1556                GNUNET_i2s (&socket->other_peer),
1557                GNUNET_i2s (&socket->other_peer), socket->state);
1558     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1559     return GNUNET_SYSERR;
1560   }
1561 }
1562
1563
1564 /**
1565  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1566  *
1567  * @param cls the socket (set from GNUNET_MESH_connect)
1568  * @param tunnel connection to the other end
1569  * @param tunnel_ctx this is NULL
1570  * @param sender who sent the message
1571  * @param message the actual message
1572  * @param atsi performance data for the connection
1573  * @return GNUNET_OK to keep the connection open,
1574  *         GNUNET_SYSERR to close it (signal serious error)
1575  */
1576 static int
1577 client_handle_reset (void *cls,
1578                      struct GNUNET_MESH_Tunnel *tunnel,
1579                      void **tunnel_ctx,
1580                      const struct GNUNET_PeerIdentity *sender,
1581                      const struct GNUNET_MessageHeader *message,
1582                      const struct GNUNET_ATS_Information*atsi)
1583 {
1584   // struct GNUNET_STREAM_Socket *socket = cls;
1585
1586   return GNUNET_OK;
1587 }
1588
1589
1590 /**
1591  * Common message handler for handling TRANSMIT_CLOSE messages
1592  *
1593  * @param socket the socket through which the ack was received
1594  * @param tunnel connection to the other end
1595  * @param sender who sent the message
1596  * @param msg the transmit close message
1597  * @param atsi performance data for the connection
1598  * @return GNUNET_OK to keep the connection open,
1599  *         GNUNET_SYSERR to close it (signal serious error)
1600  */
1601 static int
1602 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1603                        struct GNUNET_MESH_Tunnel *tunnel,
1604                        const struct GNUNET_PeerIdentity *sender,
1605                        const struct GNUNET_STREAM_MessageHeader *msg,
1606                        const struct GNUNET_ATS_Information*atsi)
1607 {
1608   struct GNUNET_STREAM_MessageHeader *reply;
1609
1610   switch (socket->state)
1611   {
1612   case STATE_ESTABLISHED:
1613     socket->state = STATE_RECEIVE_CLOSED;
1614     /* Send TRANSMIT_CLOSE_ACK */
1615     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1616     reply->header.type = 
1617       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1618     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1619     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1620     break;
1621   default:
1622     /* FIXME: Call statistics? */
1623     break;
1624   }
1625   return GNUNET_YES;
1626 }
1627
1628
1629 /**
1630  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1631  *
1632  * @param cls the socket (set from GNUNET_MESH_connect)
1633  * @param tunnel connection to the other end
1634  * @param tunnel_ctx this is NULL
1635  * @param sender who sent the message
1636  * @param message the actual message
1637  * @param atsi performance data for the connection
1638  * @return GNUNET_OK to keep the connection open,
1639  *         GNUNET_SYSERR to close it (signal serious error)
1640  */
1641 static int
1642 client_handle_transmit_close (void *cls,
1643                               struct GNUNET_MESH_Tunnel *tunnel,
1644                               void **tunnel_ctx,
1645                               const struct GNUNET_PeerIdentity *sender,
1646                               const struct GNUNET_MessageHeader *message,
1647                               const struct GNUNET_ATS_Information*atsi)
1648 {
1649   struct GNUNET_STREAM_Socket *socket = cls;
1650   
1651   return handle_transmit_close (socket,
1652                                 tunnel,
1653                                 sender,
1654                                 (struct GNUNET_STREAM_MessageHeader *)message,
1655                                 atsi);
1656 }
1657
1658
1659 /**
1660  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1661  *
1662  * @param socket the socket
1663  * @param tunnel connection to the other end
1664  * @param sender who sent the message
1665  * @param message the actual message
1666  * @param atsi performance data for the connection
1667  * @param operation the close operation which is being ACK'ed
1668  * @return GNUNET_OK to keep the connection open,
1669  *         GNUNET_SYSERR to close it (signal serious error)
1670  */
1671 static int
1672 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1673                           struct GNUNET_MESH_Tunnel *tunnel,
1674                           const struct GNUNET_PeerIdentity *sender,
1675                           const struct GNUNET_STREAM_MessageHeader *message,
1676                           const struct GNUNET_ATS_Information *atsi,
1677                           int operation)
1678 {
1679   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1680
1681   shutdown_handle = socket->shutdown_handle;
1682   if (NULL == shutdown_handle)
1683   {
1684     LOG (GNUNET_ERROR_TYPE_DEBUG,
1685          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1686          GNUNET_i2s (&socket->other_peer));
1687     return GNUNET_OK;
1688   }
1689   switch (operation)
1690   {
1691   case SHUT_RDWR:
1692     switch (socket->state)
1693     {
1694     case STATE_CLOSE_WAIT:
1695       if (SHUT_RDWR != shutdown_handle->operation)
1696       {
1697         LOG (GNUNET_ERROR_TYPE_DEBUG,
1698              "%s: Received CLOSE_ACK when shutdown handle is not for "
1699              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1700         return GNUNET_OK;
1701       }
1702       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1703            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1704       socket->state = STATE_CLOSED;
1705       break;
1706     default:
1707       LOG (GNUNET_ERROR_TYPE_DEBUG,
1708            "%s: Received CLOSE_ACK when in it not expected\n",
1709            GNUNET_i2s (&socket->other_peer));
1710       return GNUNET_OK;
1711     }
1712     break;
1713   case SHUT_RD:
1714     switch (socket->state)
1715     {
1716     case STATE_RECEIVE_CLOSE_WAIT:
1717       if (SHUT_RD != shutdown_handle->operation)
1718       {
1719         LOG (GNUNET_ERROR_TYPE_DEBUG,
1720              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1721              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1722         return GNUNET_OK;
1723       }
1724       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1725            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1726       socket->state = STATE_RECEIVE_CLOSED;
1727       break;
1728     default:
1729       LOG (GNUNET_ERROR_TYPE_DEBUG,
1730            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1731            GNUNET_i2s (&socket->other_peer));
1732       return GNUNET_OK;
1733     }
1734     break;
1735   case SHUT_WR:
1736     switch (socket->state)
1737     {
1738     case STATE_TRANSMIT_CLOSE_WAIT:
1739       if (SHUT_WR != shutdown_handle->operation)
1740       {
1741         LOG (GNUNET_ERROR_TYPE_DEBUG,
1742              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1743              "is not for SHUT_WR\n",
1744              GNUNET_i2s (&socket->other_peer));
1745         return GNUNET_OK;
1746       }
1747       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1748            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1749       socket->state = STATE_TRANSMIT_CLOSED;
1750       break;
1751     default:
1752       LOG (GNUNET_ERROR_TYPE_DEBUG,
1753            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1754            GNUNET_i2s (&socket->other_peer));          
1755       return GNUNET_OK;
1756     }
1757     break;
1758   default:
1759     GNUNET_assert (0);
1760   }
1761   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1762     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1763                                    operation);
1764   if (GNUNET_SCHEDULER_NO_TASK
1765       != shutdown_handle->close_msg_retransmission_task_id)
1766   {
1767     GNUNET_SCHEDULER_cancel
1768       (shutdown_handle->close_msg_retransmission_task_id);
1769     shutdown_handle->close_msg_retransmission_task_id =
1770         GNUNET_SCHEDULER_NO_TASK;
1771   }
1772   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1773   socket->shutdown_handle = NULL;
1774   return GNUNET_OK;
1775 }
1776
1777
1778 /**
1779  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1780  *
1781  * @param cls the socket (set from GNUNET_MESH_connect)
1782  * @param tunnel connection to the other end
1783  * @param tunnel_ctx this is NULL
1784  * @param sender who sent the message
1785  * @param message the actual message
1786  * @param atsi performance data for the connection
1787  * @return GNUNET_OK to keep the connection open,
1788  *         GNUNET_SYSERR to close it (signal serious error)
1789  */
1790 static int
1791 client_handle_transmit_close_ack (void *cls,
1792                                   struct GNUNET_MESH_Tunnel *tunnel,
1793                                   void **tunnel_ctx,
1794                                   const struct GNUNET_PeerIdentity *sender,
1795                                   const struct GNUNET_MessageHeader *message,
1796                                   const struct GNUNET_ATS_Information*atsi)
1797 {
1798   struct GNUNET_STREAM_Socket *socket = cls;
1799
1800   return handle_generic_close_ack (socket,
1801                                    tunnel,
1802                                    sender,
1803                                    (const struct GNUNET_STREAM_MessageHeader *)
1804                                    message,
1805                                    atsi,
1806                                    SHUT_WR);
1807 }
1808
1809
1810 /**
1811  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1812  *
1813  * @param socket the socket
1814  * @param tunnel connection to the other end
1815  * @param sender who sent the message
1816  * @param message the actual message
1817  * @param atsi performance data for the connection
1818  * @return GNUNET_OK to keep the connection open,
1819  *         GNUNET_SYSERR to close it (signal serious error)
1820  */
1821 static int
1822 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1823                       struct GNUNET_MESH_Tunnel *tunnel,
1824                       const struct GNUNET_PeerIdentity *sender,
1825                       const struct GNUNET_STREAM_MessageHeader *message,
1826                       const struct GNUNET_ATS_Information *atsi)
1827 {
1828   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1829
1830   switch (socket->state)
1831   {
1832   case STATE_INIT:
1833   case STATE_LISTEN:
1834   case STATE_HELLO_WAIT:
1835     LOG (GNUNET_ERROR_TYPE_DEBUG,
1836          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1837          GNUNET_i2s (&socket->other_peer));
1838     return GNUNET_OK;
1839   default:
1840     break;
1841   }
1842   
1843   LOG (GNUNET_ERROR_TYPE_DEBUG,
1844        "%s: Received RECEIVE_CLOSE from %s\n",
1845        GNUNET_i2s (&socket->other_peer),
1846        GNUNET_i2s (&socket->other_peer));
1847   receive_close_ack =
1848     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1849   receive_close_ack->header.size =
1850     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1851   receive_close_ack->header.type =
1852     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1853   queue_message (socket, receive_close_ack, &set_state_closed,
1854                  NULL, GNUNET_NO);  
1855   /* FIXME: Handle the case where write handle is present; the write operation
1856      should be deemed as finised and the write continuation callback
1857      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1858   return GNUNET_OK;
1859 }
1860
1861
1862 /**
1863  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1864  *
1865  * @param cls the socket (set from GNUNET_MESH_connect)
1866  * @param tunnel connection to the other end
1867  * @param tunnel_ctx this is NULL
1868  * @param sender who sent the message
1869  * @param message the actual message
1870  * @param atsi performance data for the connection
1871  * @return GNUNET_OK to keep the connection open,
1872  *         GNUNET_SYSERR to close it (signal serious error)
1873  */
1874 static int
1875 client_handle_receive_close (void *cls,
1876                              struct GNUNET_MESH_Tunnel *tunnel,
1877                              void **tunnel_ctx,
1878                              const struct GNUNET_PeerIdentity *sender,
1879                              const struct GNUNET_MessageHeader *message,
1880                              const struct GNUNET_ATS_Information*atsi)
1881 {
1882   struct GNUNET_STREAM_Socket *socket = cls;
1883
1884   return
1885     handle_receive_close (socket,
1886                           tunnel,
1887                           sender,
1888                           (const struct GNUNET_STREAM_MessageHeader *) message,
1889                           atsi);
1890 }
1891
1892
1893 /**
1894  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1895  *
1896  * @param cls the socket (set from GNUNET_MESH_connect)
1897  * @param tunnel connection to the other end
1898  * @param tunnel_ctx this is NULL
1899  * @param sender who sent the message
1900  * @param message the actual message
1901  * @param atsi performance data for the connection
1902  * @return GNUNET_OK to keep the connection open,
1903  *         GNUNET_SYSERR to close it (signal serious error)
1904  */
1905 static int
1906 client_handle_receive_close_ack (void *cls,
1907                                  struct GNUNET_MESH_Tunnel *tunnel,
1908                                  void **tunnel_ctx,
1909                                  const struct GNUNET_PeerIdentity *sender,
1910                                  const struct GNUNET_MessageHeader *message,
1911                                  const struct GNUNET_ATS_Information*atsi)
1912 {
1913   struct GNUNET_STREAM_Socket *socket = cls;
1914
1915   return handle_generic_close_ack (socket,
1916                                    tunnel,
1917                                    sender,
1918                                    (const struct GNUNET_STREAM_MessageHeader *)
1919                                    message,
1920                                    atsi,
1921                                    SHUT_RD);
1922 }
1923
1924
1925 /**
1926  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1927  *
1928  * @param socket the socket
1929  * @param tunnel connection to the other end
1930  * @param sender who sent the message
1931  * @param message the actual message
1932  * @param atsi performance data for the connection
1933  * @return GNUNET_OK to keep the connection open,
1934  *         GNUNET_SYSERR to close it (signal serious error)
1935  */
1936 static int
1937 handle_close (struct GNUNET_STREAM_Socket *socket,
1938               struct GNUNET_MESH_Tunnel *tunnel,
1939               const struct GNUNET_PeerIdentity *sender,
1940               const struct GNUNET_STREAM_MessageHeader *message,
1941               const struct GNUNET_ATS_Information*atsi)
1942 {
1943   struct GNUNET_STREAM_MessageHeader *close_ack;
1944
1945   switch (socket->state)
1946   {
1947   case STATE_INIT:
1948   case STATE_LISTEN:
1949   case STATE_HELLO_WAIT:
1950     LOG (GNUNET_ERROR_TYPE_DEBUG,
1951          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1952          GNUNET_i2s (&socket->other_peer));
1953     return GNUNET_OK;
1954   default:
1955     break;
1956   }
1957
1958   LOG (GNUNET_ERROR_TYPE_DEBUG,
1959        "%s: Received CLOSE from %s\n",
1960        GNUNET_i2s (&socket->other_peer),
1961        GNUNET_i2s (&socket->other_peer));
1962   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1963   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1964   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1965   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1966   if (socket->state == STATE_CLOSED)
1967     return GNUNET_OK;
1968
1969   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1970   socket->receive_buffer = NULL;
1971   socket->receive_buffer_size = 0;
1972   return GNUNET_OK;
1973 }
1974
1975
1976 /**
1977  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1978  *
1979  * @param cls the socket (set from GNUNET_MESH_connect)
1980  * @param tunnel connection to the other end
1981  * @param tunnel_ctx this is NULL
1982  * @param sender who sent the message
1983  * @param message the actual message
1984  * @param atsi performance data for the connection
1985  * @return GNUNET_OK to keep the connection open,
1986  *         GNUNET_SYSERR to close it (signal serious error)
1987  */
1988 static int
1989 client_handle_close (void *cls,
1990                      struct GNUNET_MESH_Tunnel *tunnel,
1991                      void **tunnel_ctx,
1992                      const struct GNUNET_PeerIdentity *sender,
1993                      const struct GNUNET_MessageHeader *message,
1994                      const struct GNUNET_ATS_Information*atsi)
1995 {
1996   struct GNUNET_STREAM_Socket *socket = cls;
1997
1998   return handle_close (socket,
1999                        tunnel,
2000                        sender,
2001                        (const struct GNUNET_STREAM_MessageHeader *) message,
2002                        atsi);
2003 }
2004
2005
2006 /**
2007  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2008  *
2009  * @param cls the socket (set from GNUNET_MESH_connect)
2010  * @param tunnel connection to the other end
2011  * @param tunnel_ctx this is NULL
2012  * @param sender who sent the message
2013  * @param message the actual message
2014  * @param atsi performance data for the connection
2015  * @return GNUNET_OK to keep the connection open,
2016  *         GNUNET_SYSERR to close it (signal serious error)
2017  */
2018 static int
2019 client_handle_close_ack (void *cls,
2020                          struct GNUNET_MESH_Tunnel *tunnel,
2021                          void **tunnel_ctx,
2022                          const struct GNUNET_PeerIdentity *sender,
2023                          const struct GNUNET_MessageHeader *message,
2024                          const struct GNUNET_ATS_Information *atsi)
2025 {
2026   struct GNUNET_STREAM_Socket *socket = cls;
2027
2028   return handle_generic_close_ack (socket,
2029                                    tunnel,
2030                                    sender,
2031                                    (const struct GNUNET_STREAM_MessageHeader *) 
2032                                    message,
2033                                    atsi,
2034                                    SHUT_RDWR);
2035 }
2036
2037 /*****************************/
2038 /* Server's Message Handlers */
2039 /*****************************/
2040
2041 /**
2042  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2043  *
2044  * @param cls the closure
2045  * @param tunnel connection to the other end
2046  * @param tunnel_ctx the socket
2047  * @param sender who sent the message
2048  * @param message the actual message
2049  * @param atsi performance data for the connection
2050  * @return GNUNET_OK to keep the connection open,
2051  *         GNUNET_SYSERR to close it (signal serious error)
2052  */
2053 static int
2054 server_handle_data (void *cls,
2055                     struct GNUNET_MESH_Tunnel *tunnel,
2056                     void **tunnel_ctx,
2057                     const struct GNUNET_PeerIdentity *sender,
2058                     const struct GNUNET_MessageHeader *message,
2059                     const struct GNUNET_ATS_Information*atsi)
2060 {
2061   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2062
2063   return handle_data (socket,
2064                       tunnel,
2065                       sender,
2066                       (const struct GNUNET_STREAM_DataMessage *)message,
2067                       atsi);
2068 }
2069
2070
2071 /**
2072  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2073  *
2074  * @param cls the closure
2075  * @param tunnel connection to the other end
2076  * @param tunnel_ctx the socket
2077  * @param sender who sent the message
2078  * @param message the actual message
2079  * @param atsi performance data for the connection
2080  * @return GNUNET_OK to keep the connection open,
2081  *         GNUNET_SYSERR to close it (signal serious error)
2082  */
2083 static int
2084 server_handle_hello (void *cls,
2085                      struct GNUNET_MESH_Tunnel *tunnel,
2086                      void **tunnel_ctx,
2087                      const struct GNUNET_PeerIdentity *sender,
2088                      const struct GNUNET_MessageHeader *message,
2089                      const struct GNUNET_ATS_Information*atsi)
2090 {
2091   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2092   struct GNUNET_STREAM_HelloAckMessage *reply;
2093
2094   if (0 != memcmp (sender,
2095                    &socket->other_peer,
2096                    sizeof (struct GNUNET_PeerIdentity)))
2097   {
2098     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2099                GNUNET_i2s (&socket->other_peer));
2100     return GNUNET_YES;
2101   }
2102   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2103   GNUNET_assert (socket->tunnel == tunnel);
2104   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2105              GNUNET_i2s (&socket->other_peer));
2106   switch (socket->status)
2107   {
2108   case STATE_INIT:
2109     reply = generate_hello_ack (socket, GNUNET_YES);
2110     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2111                    GNUNET_NO);
2112     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2113                    socket->control_retransmission_task_id);
2114     socket->control_retransmission_task_id =
2115       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2116                                     &control_retransmission_task, socket);
2117     break;
2118   case STATE_HELLO_WAIT:
2119     /* Perhaps our HELLO_ACK was lost */
2120     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2121                    socket->control_retransmission_task_id);
2122     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2123     socket->control_retransmission_task_id =
2124       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2125     break;
2126   default:
2127     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2128                GNUNET_i2s (&socket->other_peer), socket->state);
2129     /* FIXME: Send RESET? */
2130   }
2131   return GNUNET_OK;
2132 }
2133
2134
2135 /**
2136  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2137  *
2138  * @param cls the closure
2139  * @param tunnel connection to the other end
2140  * @param tunnel_ctx the socket
2141  * @param sender who sent the message
2142  * @param message the actual message
2143  * @param atsi performance data for the connection
2144  * @return GNUNET_OK to keep the connection open,
2145  *         GNUNET_SYSERR to close it (signal serious error)
2146  */
2147 static int
2148 server_handle_hello_ack (void *cls,
2149                          struct GNUNET_MESH_Tunnel *tunnel,
2150                          void **tunnel_ctx,
2151                          const struct GNUNET_PeerIdentity *sender,
2152                          const struct GNUNET_MessageHeader *message,
2153                          const struct GNUNET_ATS_Information*atsi)
2154 {
2155   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2156   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2157
2158   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2159                  ntohs (message->type));
2160   GNUNET_assert (socket->tunnel == tunnel);
2161   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2162   switch (socket->state)  
2163   {
2164   case STATE_HELLO_WAIT:
2165     LOG (GNUNET_ERROR_TYPE_DEBUG,
2166          "%s: Received HELLO_ACK from %s\n",
2167          GNUNET_i2s (&socket->other_peer),
2168          GNUNET_i2s (&socket->other_peer));
2169     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2170     LOG (GNUNET_ERROR_TYPE_DEBUG,
2171          "%s: Read sequence number %u\n",
2172          GNUNET_i2s (&socket->other_peer),
2173          (unsigned int) socket->read_sequence_number);
2174     socket->receiver_window_available = 
2175       ntohl (ack_message->receiver_window_size);
2176     set_state_established (NULL, socket);
2177     break;
2178   default:
2179     LOG (GNUNET_ERROR_TYPE_DEBUG,
2180          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2181   }
2182   return GNUNET_OK;
2183 }
2184
2185
2186 /**
2187  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2188  *
2189  * @param cls the closure
2190  * @param tunnel connection to the other end
2191  * @param tunnel_ctx the socket
2192  * @param sender who sent the message
2193  * @param message the actual message
2194  * @param atsi performance data for the connection
2195  * @return GNUNET_OK to keep the connection open,
2196  *         GNUNET_SYSERR to close it (signal serious error)
2197  */
2198 static int
2199 server_handle_reset (void *cls,
2200                      struct GNUNET_MESH_Tunnel *tunnel,
2201                      void **tunnel_ctx,
2202                      const struct GNUNET_PeerIdentity *sender,
2203                      const struct GNUNET_MessageHeader *message,
2204                      const struct GNUNET_ATS_Information*atsi)
2205 {
2206   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2207
2208   return GNUNET_OK;
2209 }
2210
2211
2212 /**
2213  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2214  *
2215  * @param cls the closure
2216  * @param tunnel connection to the other end
2217  * @param tunnel_ctx the socket
2218  * @param sender who sent the message
2219  * @param message the actual message
2220  * @param atsi performance data for the connection
2221  * @return GNUNET_OK to keep the connection open,
2222  *         GNUNET_SYSERR to close it (signal serious error)
2223  */
2224 static int
2225 server_handle_transmit_close (void *cls,
2226                               struct GNUNET_MESH_Tunnel *tunnel,
2227                               void **tunnel_ctx,
2228                               const struct GNUNET_PeerIdentity *sender,
2229                               const struct GNUNET_MessageHeader *message,
2230                               const struct GNUNET_ATS_Information*atsi)
2231 {
2232   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2233
2234   return handle_transmit_close (socket,
2235                                 tunnel,
2236                                 sender,
2237                                 (struct GNUNET_STREAM_MessageHeader *)message,
2238                                 atsi);
2239 }
2240
2241
2242 /**
2243  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2244  *
2245  * @param cls the closure
2246  * @param tunnel connection to the other end
2247  * @param tunnel_ctx the socket
2248  * @param sender who sent the message
2249  * @param message the actual message
2250  * @param atsi performance data for the connection
2251  * @return GNUNET_OK to keep the connection open,
2252  *         GNUNET_SYSERR to close it (signal serious error)
2253  */
2254 static int
2255 server_handle_transmit_close_ack (void *cls,
2256                                   struct GNUNET_MESH_Tunnel *tunnel,
2257                                   void **tunnel_ctx,
2258                                   const struct GNUNET_PeerIdentity *sender,
2259                                   const struct GNUNET_MessageHeader *message,
2260                                   const struct GNUNET_ATS_Information*atsi)
2261 {
2262   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2263
2264   return handle_generic_close_ack (socket,
2265                                    tunnel,
2266                                    sender,
2267                                    (const struct GNUNET_STREAM_MessageHeader *)
2268                                    message,
2269                                    atsi,
2270                                    SHUT_WR);
2271 }
2272
2273
2274 /**
2275  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2276  *
2277  * @param cls the closure
2278  * @param tunnel connection to the other end
2279  * @param tunnel_ctx the socket
2280  * @param sender who sent the message
2281  * @param message the actual message
2282  * @param atsi performance data for the connection
2283  * @return GNUNET_OK to keep the connection open,
2284  *         GNUNET_SYSERR to close it (signal serious error)
2285  */
2286 static int
2287 server_handle_receive_close (void *cls,
2288                              struct GNUNET_MESH_Tunnel *tunnel,
2289                              void **tunnel_ctx,
2290                              const struct GNUNET_PeerIdentity *sender,
2291                              const struct GNUNET_MessageHeader *message,
2292                              const struct GNUNET_ATS_Information*atsi)
2293 {
2294   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2295
2296   return
2297     handle_receive_close (socket,
2298                           tunnel,
2299                           sender,
2300                           (const struct GNUNET_STREAM_MessageHeader *) message,
2301                           atsi);
2302 }
2303
2304
2305 /**
2306  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2307  *
2308  * @param cls the closure
2309  * @param tunnel connection to the other end
2310  * @param tunnel_ctx the socket
2311  * @param sender who sent the message
2312  * @param message the actual message
2313  * @param atsi performance data for the connection
2314  * @return GNUNET_OK to keep the connection open,
2315  *         GNUNET_SYSERR to close it (signal serious error)
2316  */
2317 static int
2318 server_handle_receive_close_ack (void *cls,
2319                                  struct GNUNET_MESH_Tunnel *tunnel,
2320                                  void **tunnel_ctx,
2321                                  const struct GNUNET_PeerIdentity *sender,
2322                                  const struct GNUNET_MessageHeader *message,
2323                                  const struct GNUNET_ATS_Information*atsi)
2324 {
2325   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2326
2327   return handle_generic_close_ack (socket,
2328                                    tunnel,
2329                                    sender,
2330                                    (const struct GNUNET_STREAM_MessageHeader *)
2331                                    message,
2332                                    atsi,
2333                                    SHUT_RD);
2334 }
2335
2336
2337 /**
2338  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2339  *
2340  * @param cls the listen socket (from GNUNET_MESH_connect in
2341  *          GNUNET_STREAM_listen) 
2342  * @param tunnel connection to the other end
2343  * @param tunnel_ctx the socket
2344  * @param sender who sent the message
2345  * @param message the actual message
2346  * @param atsi performance data for the connection
2347  * @return GNUNET_OK to keep the connection open,
2348  *         GNUNET_SYSERR to close it (signal serious error)
2349  */
2350 static int
2351 server_handle_close (void *cls,
2352                      struct GNUNET_MESH_Tunnel *tunnel,
2353                      void **tunnel_ctx,
2354                      const struct GNUNET_PeerIdentity *sender,
2355                      const struct GNUNET_MessageHeader *message,
2356                      const struct GNUNET_ATS_Information*atsi)
2357 {
2358   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2359   
2360   return handle_close (socket,
2361                        tunnel,
2362                        sender,
2363                        (const struct GNUNET_STREAM_MessageHeader *) message,
2364                        atsi);
2365 }
2366
2367
2368 /**
2369  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2370  *
2371  * @param cls the closure
2372  * @param tunnel connection to the other end
2373  * @param tunnel_ctx the socket
2374  * @param sender who sent the message
2375  * @param message the actual message
2376  * @param atsi performance data for the connection
2377  * @return GNUNET_OK to keep the connection open,
2378  *         GNUNET_SYSERR to close it (signal serious error)
2379  */
2380 static int
2381 server_handle_close_ack (void *cls,
2382                          struct GNUNET_MESH_Tunnel *tunnel,
2383                          void **tunnel_ctx,
2384                          const struct GNUNET_PeerIdentity *sender,
2385                          const struct GNUNET_MessageHeader *message,
2386                          const struct GNUNET_ATS_Information*atsi)
2387 {
2388   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2389
2390   return handle_generic_close_ack (socket,
2391                                    tunnel,
2392                                    sender,
2393                                    (const struct GNUNET_STREAM_MessageHeader *) 
2394                                    message,
2395                                    atsi,
2396                                    SHUT_RDWR);
2397 }
2398
2399
2400 /**
2401  * Handler for DATA_ACK messages
2402  *
2403  * @param socket the socket through which the ack was received
2404  * @param tunnel connection to the other end
2405  * @param sender who sent the message
2406  * @param ack the acknowledgment message
2407  * @param atsi performance data for the connection
2408  * @return GNUNET_OK to keep the connection open,
2409  *         GNUNET_SYSERR to close it (signal serious error)
2410  */
2411 static int
2412 handle_ack (struct GNUNET_STREAM_Socket *socket,
2413             struct GNUNET_MESH_Tunnel *tunnel,
2414             const struct GNUNET_PeerIdentity *sender,
2415             const struct GNUNET_STREAM_AckMessage *ack,
2416             const struct GNUNET_ATS_Information*atsi)
2417 {
2418   unsigned int packet;
2419   int need_retransmission;
2420   uint32_t sequence_difference;
2421   
2422   if (0 != memcmp (sender,
2423                    &socket->other_peer,
2424                    sizeof (struct GNUNET_PeerIdentity)))
2425   {
2426     LOG (GNUNET_ERROR_TYPE_DEBUG,
2427          "%s: Received ACK from non-confirming peer\n",
2428          GNUNET_i2s (&socket->other_peer));
2429     return GNUNET_YES;
2430   }
2431   switch (socket->state)
2432   {
2433   case (STATE_ESTABLISHED):
2434   case (STATE_RECEIVE_CLOSED):
2435   case (STATE_RECEIVE_CLOSE_WAIT):
2436     if (NULL == socket->write_handle)
2437     {
2438       LOG (GNUNET_ERROR_TYPE_DEBUG,
2439            "%s: Received DATA_ACK when write_handle is NULL\n",
2440            GNUNET_i2s (&socket->other_peer));
2441       return GNUNET_OK;
2442     }
2443     sequence_difference = 
2444         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2445     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2446     {
2447       LOG (GNUNET_ERROR_TYPE_DEBUG,
2448            "%s: Received DATA_ACK with unexpected base sequence number\n",
2449            GNUNET_i2s (&socket->other_peer));
2450       LOG (GNUNET_ERROR_TYPE_DEBUG,
2451            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2452            GNUNET_i2s (&socket->other_peer),
2453            socket->write_sequence_number,
2454            ntohl (ack->base_sequence_number));
2455       return GNUNET_OK;
2456     }
2457     /* FIXME: include the case when write_handle is cancelled - ignore the 
2458        acks */
2459     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2460          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2461     /* Cancel the retransmission task */
2462     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2463     {
2464       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2465       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2466     }
2467     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2468     {
2469       if (NULL == socket->write_handle->messages[packet]) break;
2470       /* BS: Base sequence from ack; PS: sequence num of current packet */
2471       sequence_difference = ntohl (ack->base_sequence_number)
2472         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2473       if ((0 == sequence_difference) ||
2474           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2475         continue; /* The message in our handle is not yet received */
2476       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2477       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2478       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2479                             packet, GNUNET_YES);
2480     }
2481     /* Update the receive window remaining
2482        FIXME : Should update with the value from a data ack with greater
2483        sequence number */
2484     socket->receiver_window_available = 
2485       ntohl (ack->receive_window_remaining);
2486     /* Check if we have received all acknowledgements */
2487     need_retransmission = GNUNET_NO;
2488     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2489     {
2490       if (NULL == socket->write_handle->messages[packet]) break;
2491       if (GNUNET_YES != ackbitmap_is_bit_set 
2492           (&socket->write_handle->ack_bitmap,packet))
2493       {
2494         need_retransmission = GNUNET_YES;
2495         break;
2496       }
2497     }
2498     if (GNUNET_YES == need_retransmission)
2499     {
2500       write_data (socket);
2501     }
2502     else      /* We have to call the write continuation callback now */
2503     {
2504       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2505       
2506       /* Free the packets */
2507       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2508       {
2509         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2510       }
2511       write_handle = socket->write_handle;
2512       socket->write_handle = NULL;
2513       if (NULL != write_handle->write_cont)
2514         write_handle->write_cont (write_handle->write_cont_cls,
2515                                   socket->status,
2516                                   write_handle->size);
2517       /* We are done with the write handle - Freeing it */
2518       GNUNET_free (write_handle);
2519       LOG (GNUNET_ERROR_TYPE_DEBUG,
2520            "%s: Write completion callback completed\n",
2521            GNUNET_i2s (&socket->other_peer));      
2522     }
2523     break;
2524   default:
2525     break;
2526   }
2527   return GNUNET_OK;
2528 }
2529
2530
2531 /**
2532  * Handler for DATA_ACK messages
2533  *
2534  * @param cls the 'struct GNUNET_STREAM_Socket'
2535  * @param tunnel connection to the other end
2536  * @param tunnel_ctx unused
2537  * @param sender who sent the message
2538  * @param message the actual message
2539  * @param atsi performance data for the connection
2540  * @return GNUNET_OK to keep the connection open,
2541  *         GNUNET_SYSERR to close it (signal serious error)
2542  */
2543 static int
2544 client_handle_ack (void *cls,
2545                    struct GNUNET_MESH_Tunnel *tunnel,
2546                    void **tunnel_ctx,
2547                    const struct GNUNET_PeerIdentity *sender,
2548                    const struct GNUNET_MessageHeader *message,
2549                    const struct GNUNET_ATS_Information*atsi)
2550 {
2551   struct GNUNET_STREAM_Socket *socket = cls;
2552   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2553  
2554   return handle_ack (socket, tunnel, sender, ack, atsi);
2555 }
2556
2557
2558 /**
2559  * Handler for DATA_ACK messages
2560  *
2561  * @param cls the server's listen socket
2562  * @param tunnel connection to the other end
2563  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2564  * @param sender who sent the message
2565  * @param message the actual message
2566  * @param atsi performance data for the connection
2567  * @return GNUNET_OK to keep the connection open,
2568  *         GNUNET_SYSERR to close it (signal serious error)
2569  */
2570 static int
2571 server_handle_ack (void *cls,
2572                    struct GNUNET_MESH_Tunnel *tunnel,
2573                    void **tunnel_ctx,
2574                    const struct GNUNET_PeerIdentity *sender,
2575                    const struct GNUNET_MessageHeader *message,
2576                    const struct GNUNET_ATS_Information*atsi)
2577 {
2578   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2579   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2580  
2581   return handle_ack (socket, tunnel, sender, ack, atsi);
2582 }
2583
2584
2585 /**
2586  * For client message handlers, the stream socket is in the
2587  * closure argument.
2588  */
2589 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2590   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2591   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2592    sizeof (struct GNUNET_STREAM_AckMessage) },
2593   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2594    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2595   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2596    sizeof (struct GNUNET_STREAM_MessageHeader)},
2597   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2598    sizeof (struct GNUNET_STREAM_MessageHeader)},
2599   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2600    sizeof (struct GNUNET_STREAM_MessageHeader)},
2601   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2602    sizeof (struct GNUNET_STREAM_MessageHeader)},
2603   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2604    sizeof (struct GNUNET_STREAM_MessageHeader)},
2605   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2606    sizeof (struct GNUNET_STREAM_MessageHeader)},
2607   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2608    sizeof (struct GNUNET_STREAM_MessageHeader)},
2609   {NULL, 0, 0}
2610 };
2611
2612
2613 /**
2614  * For server message handlers, the stream socket is in the
2615  * tunnel context, and the listen socket in the closure argument.
2616  */
2617 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2618   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2619   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2620    sizeof (struct GNUNET_STREAM_AckMessage) },
2621   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2622    sizeof (struct GNUNET_STREAM_MessageHeader)},
2623   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2624    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2625   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2626    sizeof (struct GNUNET_STREAM_MessageHeader)},
2627   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2628    sizeof (struct GNUNET_STREAM_MessageHeader)},
2629   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2630    sizeof (struct GNUNET_STREAM_MessageHeader)},
2631   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2632    sizeof (struct GNUNET_STREAM_MessageHeader)},
2633   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2634    sizeof (struct GNUNET_STREAM_MessageHeader)},
2635   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2636    sizeof (struct GNUNET_STREAM_MessageHeader)},
2637   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2638    sizeof (struct GNUNET_STREAM_MessageHeader)},
2639   {NULL, 0, 0}
2640 };
2641
2642
2643 /**
2644  * Function called when our target peer is connected to our tunnel
2645  *
2646  * @param cls the socket for which this tunnel is created
2647  * @param peer the peer identity of the target
2648  * @param atsi performance data for the connection
2649  */
2650 static void
2651 mesh_peer_connect_callback (void *cls,
2652                             const struct GNUNET_PeerIdentity *peer,
2653                             const struct GNUNET_ATS_Information * atsi)
2654 {
2655   struct GNUNET_STREAM_Socket *socket = cls;
2656   struct GNUNET_STREAM_MessageHeader *message;
2657   
2658   if (0 != memcmp (peer,
2659                    &socket->other_peer,
2660                    sizeof (struct GNUNET_PeerIdentity)))
2661   {
2662     LOG (GNUNET_ERROR_TYPE_DEBUG,
2663          "%s: A peer which is not our target has connected to our tunnel\n",
2664          GNUNET_i2s(peer));
2665     return;
2666   }
2667   LOG (GNUNET_ERROR_TYPE_DEBUG,
2668        "%s: Target peer %s connected\n",
2669        GNUNET_i2s (&socket->other_peer),
2670        GNUNET_i2s (&socket->other_peer));
2671   /* Set state to INIT */
2672   socket->state = STATE_INIT;
2673   /* Send HELLO message */
2674   message = generate_hello ();
2675   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2676   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2677                  socket->control_retransmission_task_id);
2678   socket->control_retransmission_task_id =
2679     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2680                                   &control_retransmission_task, socket);
2681 }
2682
2683
2684 /**
2685  * Function called when our target peer is disconnected from our tunnel
2686  *
2687  * @param cls the socket associated which this tunnel
2688  * @param peer the peer identity of the target
2689  */
2690 static void
2691 mesh_peer_disconnect_callback (void *cls,
2692                                const struct GNUNET_PeerIdentity *peer)
2693 {
2694   struct GNUNET_STREAM_Socket *socket=cls;
2695   
2696   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2697   LOG (GNUNET_ERROR_TYPE_DEBUG,
2698        "%s: Other peer %s disconnected \n",
2699        GNUNET_i2s (&socket->other_peer),
2700        GNUNET_i2s (&socket->other_peer));
2701 }
2702
2703
2704 /**
2705  * Method called whenever a peer creates a tunnel to us
2706  *
2707  * @param cls closure
2708  * @param tunnel new handle to the tunnel
2709  * @param initiator peer that started the tunnel
2710  * @param atsi performance information for the tunnel
2711  * @return initial tunnel context for the tunnel
2712  *         (can be NULL -- that's not an error)
2713  */
2714 static void *
2715 new_tunnel_notify (void *cls,
2716                    struct GNUNET_MESH_Tunnel *tunnel,
2717                    const struct GNUNET_PeerIdentity *initiator,
2718                    const struct GNUNET_ATS_Information *atsi)
2719 {
2720   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2721   struct GNUNET_STREAM_Socket *socket;
2722
2723   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2724      from the same peer again until the socket is closed */
2725
2726   if (GNUNET_NO == lsocket->listening)
2727   {
2728     GNUNET_MESH_tunnel_destroy (tunnel);
2729     return NULL;
2730   }
2731   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2732   socket->other_peer = *initiator;
2733   socket->tunnel = tunnel;
2734   socket->state = STATE_INIT;
2735   socket->lsocket = lsocket;
2736   socket->retransmit_timeout = lsocket->retransmit_timeout;
2737   socket->testing_active = lsocket->testing_active;
2738   socket->testing_set_write_sequence_number_value =
2739       lsocket->testing_set_write_sequence_number_value;
2740   socket->max_payload_size = lsocket->max_payload_size;
2741   LOG (GNUNET_ERROR_TYPE_DEBUG,
2742        "%s: Peer %s initiated tunnel to us\n", 
2743        GNUNET_i2s (&socket->other_peer),
2744        GNUNET_i2s (&socket->other_peer));
2745   return socket;
2746 }
2747
2748
2749 /**
2750  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2751  * any associated state.  This function is NOT called if the client has
2752  * explicitly asked for the tunnel to be destroyed using
2753  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2754  * the tunnel.
2755  *
2756  * @param cls closure (set from GNUNET_MESH_connect)
2757  * @param tunnel connection to the other end (henceforth invalid)
2758  * @param tunnel_ctx place where local state associated
2759  *                   with the tunnel is stored
2760  */
2761 static void 
2762 tunnel_cleaner (void *cls,
2763                 const struct GNUNET_MESH_Tunnel *tunnel,
2764                 void *tunnel_ctx)
2765 {
2766   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2767
2768   GNUNET_assert (tunnel == socket->tunnel);
2769   GNUNET_break_op(0);
2770   LOG (GNUNET_ERROR_TYPE_DEBUG,
2771        "%s: Peer %s has terminated connection abruptly\n",
2772        GNUNET_i2s (&socket->other_peer),
2773        GNUNET_i2s (&socket->other_peer));
2774   socket->status = GNUNET_STREAM_SHUTDOWN;
2775   /* Clear Transmit handles */
2776   if (NULL != socket->transmit_handle)
2777   {
2778     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2779     socket->transmit_handle = NULL;
2780   }
2781   /* Stop Tasks using socket->tunnel */
2782   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2783   {
2784     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2785     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2786   }
2787   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2788   {
2789     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2790     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2791   }
2792   /* FIXME: Cancel all other tasks using socket->tunnel */
2793   socket->tunnel = NULL;
2794 }
2795
2796
2797 /**
2798  * Callback to signal timeout on lockmanager lock acquire
2799  *
2800  * @param cls the ListenSocket
2801  * @param tc the scheduler task context
2802  */
2803 static void
2804 lockmanager_acquire_timeout (void *cls, 
2805                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2806 {
2807   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2808   GNUNET_STREAM_ListenCallback listen_cb;
2809   void *listen_cb_cls;
2810
2811   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2812   listen_cb = lsocket->listen_cb;
2813   listen_cb_cls = lsocket->listen_cb_cls;
2814   if (NULL != listen_cb)
2815     listen_cb (listen_cb_cls, NULL, NULL);
2816 }
2817
2818
2819 /**
2820  * Callback to notify us on the status changes on app_port lock
2821  *
2822  * @param cls the ListenSocket
2823  * @param domain the domain name of the lock
2824  * @param lock the app_port
2825  * @param status the current status of the lock
2826  */
2827 static void
2828 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2829                        enum GNUNET_LOCKMANAGER_Status status)
2830 {
2831   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2832
2833   GNUNET_assert (lock == (uint32_t) lsocket->port);
2834   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2835   {
2836     lsocket->listening = GNUNET_YES;
2837     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2838     {
2839       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2840       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2841     }
2842     if (NULL == lsocket->mesh)
2843     {
2844       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2845
2846       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2847                                            lsocket, /* Closure */
2848                                            &new_tunnel_notify,
2849                                            &tunnel_cleaner,
2850                                            server_message_handlers,
2851                                            ports);
2852       GNUNET_assert (NULL != lsocket->mesh);
2853       if (NULL != lsocket->listen_ok_cb)
2854       {
2855         (void) lsocket->listen_ok_cb ();
2856       }
2857     }
2858   }
2859   if (GNUNET_LOCKMANAGER_RELEASE == status)
2860     lsocket->listening = GNUNET_NO;
2861 }
2862
2863
2864 /*****************/
2865 /* API functions */
2866 /*****************/
2867
2868
2869 /**
2870  * Tries to open a stream to the target peer
2871  *
2872  * @param cfg configuration to use
2873  * @param target the target peer to which the stream has to be opened
2874  * @param app_port the application port number which uniquely identifies this
2875  *            stream
2876  * @param open_cb this function will be called after stream has be established;
2877  *          cannot be NULL
2878  * @param open_cb_cls the closure for open_cb
2879  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2880  * @return if successful it returns the stream socket; NULL if stream cannot be
2881  *         opened 
2882  */
2883 struct GNUNET_STREAM_Socket *
2884 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2885                     const struct GNUNET_PeerIdentity *target,
2886                     GNUNET_MESH_ApplicationType app_port,
2887                     GNUNET_STREAM_OpenCallback open_cb,
2888                     void *open_cb_cls,
2889                     ...)
2890 {
2891   struct GNUNET_STREAM_Socket *socket;
2892   enum GNUNET_STREAM_Option option;
2893   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2894   va_list vargs;
2895   uint16_t payload_size;
2896
2897   LOG (GNUNET_ERROR_TYPE_DEBUG,
2898        "%s\n", __func__);
2899   GNUNET_assert (NULL != open_cb);
2900   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2901   socket->other_peer = *target;
2902   socket->open_cb = open_cb;
2903   socket->open_cls = open_cb_cls;
2904   /* Set defaults */
2905   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2906   socket->testing_active = GNUNET_NO;
2907   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2908   va_start (vargs, open_cb_cls); /* Parse variable args */
2909   do {
2910     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2911     switch (option)
2912     {
2913     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2914       /* Expect struct GNUNET_TIME_Relative */
2915       socket->retransmit_timeout = va_arg (vargs,
2916                                            struct GNUNET_TIME_Relative);
2917       break;
2918     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2919       socket->testing_active = GNUNET_YES;
2920       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2921                                                                 uint32_t);
2922       break;
2923     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2924       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2925       break;
2926     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2927       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2928       break;
2929     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2930       payload_size = (uint16_t) va_arg (vargs, unsigned int);
2931       GNUNET_assert (0 != payload_size);
2932       if (payload_size < socket->max_payload_size)
2933         socket->max_payload_size = payload_size;
2934       break;
2935     case GNUNET_STREAM_OPTION_END:
2936       break;
2937     }
2938   } while (GNUNET_STREAM_OPTION_END != option);
2939   va_end (vargs);               /* End of variable args parsing */
2940   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2941                                       socket, /* cls */
2942                                       NULL, /* No inbound tunnel handler */
2943                                       NULL, /* No in-tunnel cleaner */
2944                                       client_message_handlers,
2945                                       ports); /* We don't get inbound tunnels */
2946   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2947   {
2948     GNUNET_free (socket);
2949     return NULL;
2950   }
2951   /* Now create the mesh tunnel to target */
2952   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2953   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2954                                               NULL, /* Tunnel context */
2955                                               &mesh_peer_connect_callback,
2956                                               &mesh_peer_disconnect_callback,
2957                                               socket);
2958   GNUNET_assert (NULL != socket->tunnel);
2959   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2960                                         &socket->other_peer);
2961   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2962   return socket;
2963 }
2964
2965
2966 /**
2967  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2968  *
2969  * @param socket the stream socket
2970  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2971  * @param completion_cb the callback that will be called upon successful
2972  *          shutdown of given operation
2973  * @param completion_cls the closure for the completion callback
2974  * @return the shutdown handle
2975  */
2976 struct GNUNET_STREAM_ShutdownHandle *
2977 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2978                         int operation,
2979                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2980                         void *completion_cls)
2981 {
2982   struct GNUNET_STREAM_ShutdownHandle *handle;
2983   struct GNUNET_STREAM_MessageHeader *msg;
2984   
2985   GNUNET_assert (NULL == socket->shutdown_handle);
2986
2987   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
2988   handle->socket = socket;
2989   handle->completion_cb = completion_cb;
2990   handle->completion_cls = completion_cls;
2991   socket->shutdown_handle = handle;
2992
2993   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2994   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2995   switch (operation)
2996   {
2997   case SHUT_RD:
2998     handle->operation = SHUT_RD;
2999     if (NULL != socket->read_handle)
3000       LOG (GNUNET_ERROR_TYPE_WARNING,
3001            "Existing read handle should be cancelled before shutting"
3002            " down reading\n");
3003     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3004     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3005                    GNUNET_NO);
3006     break;
3007   case SHUT_WR:
3008     handle->operation = SHUT_WR;
3009     if (NULL != socket->write_handle)
3010       LOG (GNUNET_ERROR_TYPE_WARNING,
3011            "Existing write handle should be cancelled before shutting"
3012            " down writing\n");
3013     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3014     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3015                    GNUNET_NO);
3016     break;
3017   case SHUT_RDWR:
3018     handle->operation = SHUT_RDWR;
3019     if (NULL != socket->write_handle)
3020       LOG (GNUNET_ERROR_TYPE_WARNING,
3021            "Existing write handle should be cancelled before shutting"
3022            " down writing\n");
3023     if (NULL != socket->read_handle)
3024       LOG (GNUNET_ERROR_TYPE_WARNING,
3025            "Existing read handle should be cancelled before shutting"
3026            " down reading\n");
3027     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3028     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3029     break;
3030   default:
3031     LOG (GNUNET_ERROR_TYPE_WARNING,
3032          "GNUNET_STREAM_shutdown called with invalid value for "
3033          "parameter operation -- Ignoring\n");
3034     GNUNET_free (msg);
3035     GNUNET_free (handle);
3036     return NULL;
3037   }
3038   handle->close_msg_retransmission_task_id =
3039     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3040                                   &close_msg_retransmission_task,
3041                                   handle);
3042   return handle;
3043 }
3044
3045
3046 /**
3047  * Cancels a pending shutdown
3048  *
3049  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3050  */
3051 void
3052 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3053 {
3054   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3055     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3056   handle->socket->shutdown_handle = NULL;
3057   GNUNET_free (handle);
3058 }
3059
3060
3061 /**
3062  * Closes the stream
3063  *
3064  * @param socket the stream socket
3065  */
3066 void
3067 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3068 {
3069   struct MessageQueue *head;
3070
3071   if (NULL != socket->read_handle)
3072   {
3073     LOG (GNUNET_ERROR_TYPE_WARNING,
3074          "Closing STREAM socket when a read handle is pending\n");
3075     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3076   }
3077   if (NULL != socket->write_handle)
3078   {
3079     LOG (GNUNET_ERROR_TYPE_WARNING,
3080          "Closing STREAM socket when a write handle is pending\n");
3081     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3082     //socket->write_handle = NULL;
3083   }
3084   /* Terminate the ack'ing task if they are still present */
3085   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3086   {
3087     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3088     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3089   }
3090   /* Terminate the control retransmission tasks */
3091   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3092   {
3093     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3094   }
3095   /* Clear Transmit handles */
3096   if (NULL != socket->transmit_handle)
3097   {
3098     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3099     socket->transmit_handle = NULL;
3100   }
3101   /* Clear existing message queue */
3102   while (NULL != (head = socket->queue_head)) {
3103     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3104                                  socket->queue_tail,
3105                                  head);
3106     GNUNET_free (head->message);
3107     GNUNET_free (head);
3108   }
3109   /* Close associated tunnel */
3110   if (NULL != socket->tunnel)
3111   {
3112     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3113     socket->tunnel = NULL;
3114   }
3115   /* Close mesh connection */
3116   if (NULL != socket->mesh && NULL == socket->lsocket)
3117   {
3118     GNUNET_MESH_disconnect (socket->mesh);
3119     socket->mesh = NULL;
3120   }  
3121   /* Release receive buffer */
3122   if (NULL != socket->receive_buffer)
3123   {
3124     GNUNET_free (socket->receive_buffer);
3125   }
3126   GNUNET_free (socket);
3127 }
3128
3129
3130 /**
3131  * Listens for stream connections for a specific application ports
3132  *
3133  * @param cfg the configuration to use
3134  * @param app_port the application port for which new streams will be accepted
3135  * @param listen_cb this function will be called when a peer tries to establish
3136  *            a stream with us
3137  * @param listen_cb_cls closure for listen_cb
3138  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3139  * @return listen socket, NULL for any error
3140  */
3141 struct GNUNET_STREAM_ListenSocket *
3142 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3143                       GNUNET_MESH_ApplicationType app_port,
3144                       GNUNET_STREAM_ListenCallback listen_cb,
3145                       void *listen_cb_cls,
3146                       ...)
3147 {
3148   /* FIXME: Add variable args for passing configration options? */
3149   struct GNUNET_STREAM_ListenSocket *lsocket;
3150   struct GNUNET_TIME_Relative listen_timeout;
3151   enum GNUNET_STREAM_Option option;
3152   va_list vargs;
3153   uint16_t payload_size;
3154
3155   GNUNET_assert (NULL != listen_cb);
3156   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3157   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3158   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3159   if (NULL == lsocket->lockmanager)
3160   {
3161     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3162     GNUNET_free (lsocket);
3163     return NULL;
3164   }
3165   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3166   /* Set defaults */
3167   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3168   lsocket->testing_active = GNUNET_NO;
3169   lsocket->listen_ok_cb = NULL;
3170   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3171   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3172   va_start (vargs, listen_cb_cls);
3173   do {
3174     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3175     switch (option)
3176     {
3177     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3178       lsocket->retransmit_timeout = va_arg (vargs,
3179                                             struct GNUNET_TIME_Relative);
3180       break;
3181     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3182       lsocket->testing_active = GNUNET_YES;
3183       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3184                                                                  uint32_t);
3185       break;
3186     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3187       listen_timeout = GNUNET_TIME_relative_multiply
3188         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3189       break;
3190     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3191       lsocket->listen_ok_cb = va_arg (vargs,
3192                                       GNUNET_STREAM_ListenSuccessCallback);
3193       break;
3194     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3195       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3196       GNUNET_assert (0 != payload_size);
3197       if (payload_size < lsocket->max_payload_size)
3198         lsocket->max_payload_size = payload_size;
3199       break;
3200     case GNUNET_STREAM_OPTION_END:
3201       break;
3202     }
3203   } while (GNUNET_STREAM_OPTION_END != option);
3204   va_end (vargs);
3205   lsocket->port = app_port;
3206   lsocket->listen_cb = listen_cb;
3207   lsocket->listen_cb_cls = listen_cb_cls;
3208   lsocket->locking_request = 
3209     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3210                                      (uint32_t) lsocket->port,
3211                                      &lock_status_change_cb, lsocket);
3212   lsocket->lockmanager_acquire_timeout_task =
3213     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3214                                   &lockmanager_acquire_timeout, lsocket);
3215   return lsocket;
3216 }
3217
3218
3219 /**
3220  * Closes the listen socket
3221  *
3222  * @param lsocket the listen socket
3223  */
3224 void
3225 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3226 {
3227   /* Close MESH connection */
3228   if (NULL != lsocket->mesh)
3229     GNUNET_MESH_disconnect (lsocket->mesh);
3230   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3231   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3232     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3233   if (NULL != lsocket->locking_request)
3234     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3235   if (NULL != lsocket->lockmanager)
3236     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3237   GNUNET_free (lsocket);
3238 }
3239
3240
3241 /**
3242  * Tries to write the given data to the stream. The maximum size of data that
3243  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3244  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3245  * violation, however only the said number of maximum bytes will be written.
3246  *
3247  * @param socket the socket representing a stream
3248  * @param data the data buffer from where the data is written into the stream
3249  * @param size the number of bytes to be written from the data buffer
3250  * @param timeout the timeout period
3251  * @param write_cont the function to call upon writing some bytes into the
3252  *          stream 
3253  * @param write_cont_cls the closure
3254  *
3255  * @return handle to cancel the operation; if a previous write is pending or
3256  *           the stream has been shutdown for this operation then write_cont is
3257  *           immediately called and NULL is returned.
3258  */
3259 struct GNUNET_STREAM_IOWriteHandle *
3260 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3261                      const void *data,
3262                      size_t size,
3263                      struct GNUNET_TIME_Relative timeout,
3264                      GNUNET_STREAM_CompletionContinuation write_cont,
3265                      void *write_cont_cls)
3266 {
3267   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3268   struct GNUNET_STREAM_DataMessage *data_msg;
3269   const void *sweep;
3270   struct GNUNET_TIME_Relative ack_deadline;
3271   unsigned int num_needed_packets;
3272   unsigned int packet;
3273   uint32_t packet_size;
3274   uint32_t payload_size;
3275   uint16_t max_data_packet_size;
3276
3277   LOG (GNUNET_ERROR_TYPE_DEBUG,
3278        "%s\n", __func__);
3279   if (NULL != socket->write_handle)
3280   {
3281     GNUNET_break (0);
3282     return NULL;
3283   }
3284   switch (socket->state)
3285   {
3286   case STATE_TRANSMIT_CLOSED:
3287   case STATE_TRANSMIT_CLOSE_WAIT:
3288   case STATE_CLOSED:
3289   case STATE_CLOSE_WAIT:
3290     if (NULL != write_cont)
3291       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3292     LOG (GNUNET_ERROR_TYPE_DEBUG,
3293          "%s() END\n", __func__);
3294     return NULL;
3295   case STATE_INIT:
3296   case STATE_LISTEN:
3297   case STATE_HELLO_WAIT:
3298     if (NULL != write_cont)
3299       /* FIXME: GNUNET_STREAM_SYSERR?? */
3300       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3301     LOG (GNUNET_ERROR_TYPE_DEBUG,
3302          "%s() END\n", __func__);
3303     return NULL;
3304   case STATE_ESTABLISHED:
3305   case STATE_RECEIVE_CLOSED:
3306   case STATE_RECEIVE_CLOSE_WAIT:
3307     break;
3308   }
3309   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3310     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3311   num_needed_packets =
3312       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3313   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3314   io_handle->socket = socket;
3315   io_handle->write_cont = write_cont;
3316   io_handle->write_cont_cls = write_cont_cls;
3317   io_handle->size = size;
3318   io_handle->packets_sent = 0;
3319   sweep = data;
3320   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3321      determined from RTT */
3322   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3323   /* Divide the given buffer into packets for sending */
3324   max_data_packet_size =
3325       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3326   for (packet=0; packet < num_needed_packets; packet++)
3327   {
3328     if ((packet + 1) * socket->max_payload_size < size) 
3329     {
3330       payload_size = socket->max_payload_size;
3331       packet_size = max_data_packet_size;
3332     }
3333     else 
3334     {
3335       payload_size = size - packet * socket->max_payload_size;
3336       packet_size = 
3337           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3338     }
3339     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3340     io_handle->messages[packet]->header.header.size = htons (packet_size);
3341     io_handle->messages[packet]->header.header.type =
3342       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3343     io_handle->messages[packet]->sequence_number =
3344       htonl (socket->write_sequence_number++);
3345     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3346     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3347        determined from RTT */
3348     io_handle->messages[packet]->ack_deadline =
3349       GNUNET_TIME_relative_hton (ack_deadline);
3350     data_msg = io_handle->messages[packet];
3351     /* Copy data from given buffer to the packet */
3352     memcpy (&data_msg[1], sweep, payload_size);
3353     sweep += payload_size;
3354     socket->write_offset += payload_size;
3355   }
3356   /* ack the last data message. FIXME: remove when we figure out how to do this
3357      using RTT */
3358   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3359       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3360   socket->write_handle = io_handle;
3361   write_data (socket);
3362   LOG (GNUNET_ERROR_TYPE_DEBUG,
3363        "%s() END\n", __func__);
3364   return io_handle;
3365 }
3366
3367
3368 /**
3369  * Tries to read data from the stream.
3370  *
3371  * @param socket the socket representing a stream
3372  * @param timeout the timeout period
3373  * @param proc function to call with data (once only)
3374  * @param proc_cls the closure for proc
3375  *
3376  * @return handle to cancel the operation; NULL is returned if: the stream has
3377  *           been shutdown for this type of opeartion (the DataProcessor is
3378  *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
3379  *           read handle is present (only one read handle per socket is present
3380  *           at any time)
3381  */
3382 struct GNUNET_STREAM_IOReadHandle *
3383 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3384                     struct GNUNET_TIME_Relative timeout,
3385                     GNUNET_STREAM_DataProcessor proc,
3386                     void *proc_cls)
3387 {
3388   struct GNUNET_STREAM_IOReadHandle *read_handle;
3389   
3390   LOG (GNUNET_ERROR_TYPE_DEBUG,
3391        "%s: %s()\n", 
3392        GNUNET_i2s (&socket->other_peer),
3393        __func__);
3394   /* Return NULL if there is already a read handle; the user has to cancel that
3395      first before continuing or has to wait until it is completed */
3396   if (NULL != socket->read_handle) 
3397     return NULL;
3398   GNUNET_assert (NULL != proc);
3399   switch (socket->state)
3400   {
3401   case STATE_RECEIVE_CLOSED:
3402   case STATE_RECEIVE_CLOSE_WAIT:
3403   case STATE_CLOSED:
3404   case STATE_CLOSE_WAIT:
3405     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3406     LOG (GNUNET_ERROR_TYPE_DEBUG,
3407          "%s: %s() END\n",
3408          GNUNET_i2s (&socket->other_peer),
3409          __func__);
3410     return NULL;
3411   default:
3412     break;
3413   }
3414   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3415   read_handle->proc = proc;
3416   read_handle->proc_cls = proc_cls;
3417   read_handle->socket = socket;
3418   socket->read_handle = read_handle;
3419   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3420                                           0))
3421     read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3422                                                           socket);   
3423   read_handle->read_io_timeout_task_id =
3424       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3425   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3426        GNUNET_i2s (&socket->other_peer), __func__);
3427   return read_handle;
3428 }
3429
3430
3431 /**
3432  * Cancel pending write operation.
3433  *
3434  * @param ioh handle to operation to cancel
3435  */
3436 void
3437 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3438 {
3439   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3440   unsigned int packet;
3441
3442   GNUNET_assert (NULL != socket->write_handle);
3443   GNUNET_assert (socket->write_handle == ioh);
3444
3445   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3446   {
3447     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3448     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3449   }
3450
3451   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3452   {
3453     if (NULL == ioh->messages[packet]) break;
3454     GNUNET_free (ioh->messages[packet]);
3455   }
3456       
3457   GNUNET_free (socket->write_handle);
3458   socket->write_handle = NULL;
3459 }
3460
3461
3462 /**
3463  * Cancel pending read operation.
3464  *
3465  * @param ioh handle to operation to cancel
3466  */
3467 void
3468 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3469 {
3470   struct GNUNET_STREAM_Socket *socket;
3471   
3472   socket = ioh->socket;
3473   GNUNET_assert (NULL != socket->read_handle);
3474   GNUNET_assert (ioh == socket->read_handle);
3475   /* Read io time task should be there; if it is already executed then this
3476   read handle is not valid; However upon scheduler shutdown the read io task
3477   may be executed before */
3478   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3479     GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3480   /* reading task may be present; if so we have to stop it */
3481   if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3482     GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3483   GNUNET_free (ioh);
3484   socket->read_handle = NULL;
3485 }
3486
3487 /* end of stream_api.c */