c618d3f2b6049b8370911a46395b12ee3a9f09c8
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37
38 #include "platform.h"
39 #include "gnunet_common.h"
40 #include "gnunet_crypto_lib.h"
41 #include "gnunet_lockmanager_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream_protocol.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_STREAM_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * Retransmission timeout
179    */
180   struct GNUNET_TIME_Relative retransmit_timeout;
181
182   /**
183    * The Acknowledgement Bitmap
184    */
185   GNUNET_STREAM_AckBitmap ack_bitmap;
186
187   /**
188    * Time when the Acknowledgement was queued
189    */
190   struct GNUNET_TIME_Absolute ack_time_registered;
191
192   /**
193    * Queued Acknowledgement deadline
194    */
195   struct GNUNET_TIME_Relative ack_time_deadline;
196
197   /**
198    * The mesh handle
199    */
200   struct GNUNET_MESH_Handle *mesh;
201
202   /**
203    * The mesh tunnel handle
204    */
205   struct GNUNET_MESH_Tunnel *tunnel;
206
207   /**
208    * Stream open closure
209    */
210   void *open_cls;
211
212   /**
213    * Stream open callback
214    */
215   GNUNET_STREAM_OpenCallback open_cb;
216
217   /**
218    * The current transmit handle (if a pending transmit request exists)
219    */
220   struct GNUNET_MESH_TransmitHandle *transmit_handle;
221
222   /**
223    * The current message associated with the transmit handle
224    */
225   struct MessageQueue *queue_head;
226
227   /**
228    * The queue tail, should always point to the last message in queue
229    */
230   struct MessageQueue *queue_tail;
231
232   /**
233    * The write IO_handle associated with this socket
234    */
235   struct GNUNET_STREAM_IOWriteHandle *write_handle;
236
237   /**
238    * The read IO_handle associated with this socket
239    */
240   struct GNUNET_STREAM_IOReadHandle *read_handle;
241
242   /**
243    * The shutdown handle associated with this socket
244    */
245   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
246
247   /**
248    * Buffer for storing received messages
249    */
250   void *receive_buffer;
251
252   /**
253    * The listen socket from which this socket is derived. Should be NULL if it
254    * is not a derived socket
255    */
256   struct GNUNET_STREAM_ListenSocket *lsocket;
257
258   /**
259    * The peer identity of the peer at the other end of the stream
260    */
261   struct GNUNET_PeerIdentity other_peer;
262
263   /**
264    * Task identifier for the read io timeout task
265    */
266   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
267
268   /**
269    * Task identifier for retransmission task after timeout
270    */
271   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
272
273   /**
274    * Task identifier for retransmission of control messages
275    */
276   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
277
278   /**
279    * The task for sending timely Acks
280    */
281   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
282
283   /**
284    * Task scheduled to continue a read operation.
285    */
286   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
287
288   /**
289    * The state of the protocol associated with this socket
290    */
291   enum State state;
292
293   /**
294    * The status of the socket
295    */
296   enum GNUNET_STREAM_Status status;
297
298   /**
299    * The number of previous timeouts; FIXME: currently not used
300    */
301   unsigned int retries;
302
303   /**
304    * The application port number (type: uint32_t)
305    */
306   GNUNET_MESH_ApplicationType app_port;
307
308   /**
309    * Whether testing mode is active or not
310    */
311   int testing_active;
312
313   /**
314    * The write sequence number to be set incase of testing
315    */
316   uint32_t testing_set_write_sequence_number_value;
317
318   /**
319    * The session id associated with this stream connection
320    * FIXME: Not used currently, may be removed
321    */
322   uint32_t session_id;
323
324   /**
325    * Write sequence number. Set to random when sending HELLO(client) and
326    * HELLO_ACK(server) 
327    */
328   uint32_t write_sequence_number;
329
330   /**
331    * Read sequence number. This number's value is determined during handshake
332    */
333   uint32_t read_sequence_number;
334
335   /**
336    * The receiver buffer size
337    */
338   uint32_t receive_buffer_size;
339
340   /**
341    * The receiver buffer boundaries
342    */
343   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
344
345   /**
346    * receiver's available buffer after the last acknowledged packet
347    */
348   uint32_t receiver_window_available;
349
350   /**
351    * The offset pointer used during write operation
352    */
353   uint32_t write_offset;
354
355   /**
356    * The offset after which we are expecting data
357    */
358   uint32_t read_offset;
359
360   /**
361    * The offset upto which user has read from the received buffer
362    */
363   uint32_t copy_offset;
364
365   /**
366    * The maximum size of the data message payload this stream handle can send
367    */
368   uint16_t max_payload_size;
369 };
370
371
372 /**
373  * A socket for listening
374  */
375 struct GNUNET_STREAM_ListenSocket
376 {
377   /**
378    * The mesh handle
379    */
380   struct GNUNET_MESH_Handle *mesh;
381
382   /**
383    * Our configuration
384    */
385   struct GNUNET_CONFIGURATION_Handle *cfg;
386
387   /**
388    * Handle to the lock manager service
389    */
390   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
391
392   /**
393    * The active LockingRequest from lockmanager
394    */
395   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
396
397   /**
398    * Callback to call after acquring a lock and listening
399    */
400   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
401
402   /**
403    * The callback function which is called after successful opening socket
404    */
405   GNUNET_STREAM_ListenCallback listen_cb;
406
407   /**
408    * The call back closure
409    */
410   void *listen_cb_cls;
411
412   /**
413    * The service port
414    */
415   GNUNET_MESH_ApplicationType port;
416   
417   /**
418    * The id of the lockmanager timeout task
419    */
420   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
421
422   /**
423    * The retransmit timeout
424    */
425   struct GNUNET_TIME_Relative retransmit_timeout;
426   
427   /**
428    * Listen enabled?
429    */
430   int listening;
431
432   /**
433    * Whether testing mode is active or not
434    */
435   int testing_active;
436
437   /**
438    * The write sequence number to be set incase of testing
439    */
440   uint32_t testing_set_write_sequence_number_value;
441
442   /**
443    * The maximum size of the data message payload this stream handle can send
444    */
445   uint16_t max_payload_size;
446
447 };
448
449
450 /**
451  * The IO Write Handle
452  */
453 struct GNUNET_STREAM_IOWriteHandle
454 {
455   /**
456    * The socket to which this write handle is associated
457    */
458   struct GNUNET_STREAM_Socket *socket;
459
460   /**
461    * The packet_buffers associated with this Handle
462    */
463   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
464
465   /**
466    * The write continuation callback
467    */
468   GNUNET_STREAM_CompletionContinuation write_cont;
469
470   /**
471    * Write continuation closure
472    */
473   void *write_cont_cls;
474
475   /**
476    * The bitmap of this IOHandle; Corresponding bit for a message is set when
477    * it has been acknowledged by the receiver
478    */
479   GNUNET_STREAM_AckBitmap ack_bitmap;
480
481   /**
482    * Number of bytes in this write handle
483    */
484   size_t size;
485 };
486
487
488 /**
489  * The IO Read Handle
490  */
491 struct GNUNET_STREAM_IOReadHandle
492 {
493   /**
494    * The socket to which this read handle is associated
495    */
496   struct GNUNET_STREAM_Socket *socket;
497   
498   /**
499    * Callback for the read processor
500    */
501   GNUNET_STREAM_DataProcessor proc;
502
503   /**
504    * The closure pointer for the read processor callback
505    */
506   void *proc_cls;
507 };
508
509
510 /**
511  * Handle for Shutdown
512  */
513 struct GNUNET_STREAM_ShutdownHandle
514 {
515   /**
516    * The socket associated with this shutdown handle
517    */
518   struct GNUNET_STREAM_Socket *socket;
519
520   /**
521    * Shutdown completion callback
522    */
523   GNUNET_STREAM_ShutdownCompletion completion_cb;
524
525   /**
526    * Closure for completion callback
527    */
528   void *completion_cls;
529
530   /**
531    * Close message retransmission task id
532    */
533   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
534
535   /**
536    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
537    */
538   int operation;  
539 };
540
541
542 /**
543  * Default value in seconds for various timeouts
544  */
545 static const unsigned int default_timeout = 10;
546
547 /**
548  * The domain name for locks we use here
549  */
550 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
551
552
553 /**
554  * Callback function for sending queued message
555  *
556  * @param cls closure the socket
557  * @param size number of bytes available in buf
558  * @param buf where the callee should write the message
559  * @return number of bytes written to buf
560  */
561 static size_t
562 send_message_notify (void *cls, size_t size, void *buf)
563 {
564   struct GNUNET_STREAM_Socket *socket = cls;
565   struct MessageQueue *head;
566   size_t ret;
567
568   socket->transmit_handle = NULL; /* Remove the transmit handle */
569   head = socket->queue_head;
570   if (NULL == head)
571     return 0; /* just to be safe */
572   if (0 == size)                /* request timed out */
573   {
574     socket->retries++;
575     LOG (GNUNET_ERROR_TYPE_DEBUG,
576          "%s: Message sending timed out. Retry %d \n",
577          GNUNET_i2s (&socket->other_peer),
578          socket->retries);
579     socket->transmit_handle = 
580       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
581                                          GNUNET_NO, /* Corking */
582                                          /* FIXME: exponential backoff */
583                                          socket->retransmit_timeout,
584                                          &socket->other_peer,
585                                          ntohs (head->message->header.size),
586                                          &send_message_notify,
587                                          socket);
588     return 0;
589   }
590   ret = ntohs (head->message->header.size);
591   GNUNET_assert (size >= ret);
592   memcpy (buf, head->message, ret);
593   if (NULL != head->finish_cb)
594   {
595     head->finish_cb (head->finish_cb_cls, socket);
596   }
597   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
598                                socket->queue_tail,
599                                head);
600   GNUNET_free (head->message);
601   GNUNET_free (head);
602   head = socket->queue_head;
603   if (NULL != head)    /* more pending messages to send */
604   {
605     socket->retries = 0;
606     socket->transmit_handle = 
607       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
608                                          GNUNET_NO, /* Corking */
609                                          /* FIXME: exponential backoff */
610                                          socket->retransmit_timeout,
611                                          &socket->other_peer,
612                                          ntohs (head->message->header.size),
613                                          &send_message_notify,
614                                          socket);
615   }
616   return ret;
617 }
618
619
620 /**
621  * Queues a message for sending using the mesh connection of a socket
622  *
623  * @param socket the socket whose mesh connection is used
624  * @param message the message to be sent
625  * @param finish_cb the callback to be called when the message is sent
626  * @param finish_cb_cls the closure for the callback
627  * @param urgent set to GNUNET_YES to add the message to the beginning of the
628  *          queue; GNUNET_NO to add at the tail
629  */
630 static void
631 queue_message (struct GNUNET_STREAM_Socket *socket,
632                struct GNUNET_STREAM_MessageHeader *message,
633                SendFinishCallback finish_cb,
634                void *finish_cb_cls,
635                int urgent)
636 {
637   struct MessageQueue *queue_entity;
638
639   GNUNET_assert 
640     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
641      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
642   LOG (GNUNET_ERROR_TYPE_DEBUG,
643        "%s: Queueing message of type %d and size %d\n",
644        GNUNET_i2s (&socket->other_peer),
645        ntohs (message->header.type),
646        ntohs (message->header.size));
647   GNUNET_assert (NULL != message);
648   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
649   queue_entity->message = message;
650   queue_entity->finish_cb = finish_cb;
651   queue_entity->finish_cb_cls = finish_cb_cls;
652   if (GNUNET_YES == urgent)
653   {
654     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
655                                  queue_entity);
656     if (NULL != socket->transmit_handle)
657     {
658       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
659       socket->transmit_handle = NULL;
660     }
661   }
662   else
663     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
664                                       socket->queue_tail,
665                                       queue_entity);
666   if (NULL == socket->transmit_handle)
667   {
668     socket->retries = 0;
669     socket->transmit_handle = 
670       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
671                                          GNUNET_NO, /* Corking */
672                                          socket->retransmit_timeout,
673                                          &socket->other_peer,
674                                          ntohs (message->header.size),
675                                          &send_message_notify,
676                                          socket);
677   }
678 }
679
680
681 /**
682  * Copies a message and queues it for sending using the mesh connection of
683  * given socket 
684  *
685  * @param socket the socket whose mesh connection is used
686  * @param message the message to be sent
687  * @param finish_cb the callback to be called when the message is sent
688  * @param finish_cb_cls the closure for the callback
689  */
690 static void
691 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
692                         const struct GNUNET_STREAM_MessageHeader *message,
693                         SendFinishCallback finish_cb,
694                         void *finish_cb_cls)
695 {
696   struct GNUNET_STREAM_MessageHeader *msg_copy;
697   uint16_t size;
698   
699   size = ntohs (message->header.size);
700   msg_copy = GNUNET_malloc (size);
701   memcpy (msg_copy, message, size);
702   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
703 }
704
705
706 /**
707  * Writes data using the given socket. The amount of data written is limited by
708  * the receiver_window_size
709  *
710  * @param socket the socket to use
711  */
712 static void 
713 write_data (struct GNUNET_STREAM_Socket *socket);
714
715
716 /**
717  * Task for retransmitting data messages if they aren't ACK before their ack
718  * deadline 
719  *
720  * @param cls the socket
721  * @param tc the Task context
722  */
723 static void
724 data_retransmission_task (void *cls,
725                           const struct GNUNET_SCHEDULER_TaskContext *tc)
726 {
727   struct GNUNET_STREAM_Socket *socket = cls;
728   
729   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
730     return;
731   LOG (GNUNET_ERROR_TYPE_DEBUG,
732        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
733   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
734   write_data (socket);
735 }
736
737
738 /**
739  * Task for sending ACK message
740  *
741  * @param cls the socket
742  * @param tc the Task context
743  */
744 static void
745 ack_task (void *cls,
746           const struct GNUNET_SCHEDULER_TaskContext *tc)
747 {
748   struct GNUNET_STREAM_Socket *socket = cls;
749   struct GNUNET_STREAM_AckMessage *ack_msg;
750
751   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
752   {
753     return;
754   }
755   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
756   /* Create the ACK Message */
757   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
758   ack_msg->header.header.size = htons (sizeof (struct 
759                                                GNUNET_STREAM_AckMessage));
760   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
761   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
762   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
763   ack_msg->receive_window_remaining = 
764     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
765   /* Queue up ACK for immediate sending */
766   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
767 }
768
769
770 /**
771  * Retransmission task for shutdown messages
772  *
773  * @param cls the shutdown handle
774  * @param tc the Task Context
775  */
776 static void
777 close_msg_retransmission_task (void *cls,
778                                const struct GNUNET_SCHEDULER_TaskContext *tc)
779 {
780   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
781   struct GNUNET_STREAM_MessageHeader *msg;
782   struct GNUNET_STREAM_Socket *socket;
783
784   GNUNET_assert (NULL != shutdown_handle);
785   socket = shutdown_handle->socket;
786
787   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
788   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
789   switch (shutdown_handle->operation)
790   {
791   case SHUT_RDWR:
792     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
793     break;
794   case SHUT_RD:
795     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
796     break;
797   case SHUT_WR:
798     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
799     break;
800   default:
801     GNUNET_free (msg);
802     shutdown_handle->close_msg_retransmission_task_id = 
803       GNUNET_SCHEDULER_NO_TASK;
804     return;
805   }
806   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
807   shutdown_handle->close_msg_retransmission_task_id =
808     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
809                                   &close_msg_retransmission_task,
810                                   shutdown_handle);
811 }
812
813
814 /**
815  * Function to modify a bit in GNUNET_STREAM_AckBitmap
816  *
817  * @param bitmap the bitmap to modify
818  * @param bit the bit number to modify
819  * @param value GNUNET_YES to on, GNUNET_NO to off
820  */
821 static void
822 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
823                       unsigned int bit, 
824                       int value)
825 {
826   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
827   if (GNUNET_YES == value)
828     *bitmap |= (1LL << bit);
829   else
830     *bitmap &= ~(1LL << bit);
831 }
832
833
834 /**
835  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
836  *
837  * @param bitmap address of the bitmap that has to be checked
838  * @param bit the bit number to check
839  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
840  */
841 static uint8_t
842 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
843                       unsigned int bit)
844 {
845   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
846   return 0 != (*bitmap & (1LL << bit));
847 }
848
849
850 /**
851  * Writes data using the given socket. The amount of data written is limited by
852  * the receiver_window_size
853  *
854  * @param socket the socket to use
855  */
856 static void 
857 write_data (struct GNUNET_STREAM_Socket *socket)
858 {
859   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
860   int packet;                   /* Although an int, should never be negative */
861   int ack_packet;
862
863   ack_packet = -1;
864   /* Find the last acknowledged packet */
865   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
866   {      
867     if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
868                                             packet))
869       ack_packet = packet;        
870     else if (NULL == io_handle->messages[packet])
871       break;
872   }
873   /* Resend packets which weren't ack'ed */
874   for (packet=0; packet < ack_packet; packet++)
875   {
876     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
877                                            packet))
878     {
879       LOG (GNUNET_ERROR_TYPE_DEBUG,
880            "%s: Placing DATA message with sequence %u in send queue\n",
881            GNUNET_i2s (&socket->other_peer),
882            ntohl (io_handle->messages[packet]->sequence_number));
883       copy_and_queue_message (socket,
884                               &io_handle->messages[packet]->header,
885                               NULL,
886                               NULL);
887     }
888   }
889   packet = ack_packet + 1;
890   /* Now send new packets if there is enough buffer space */
891   while ( (NULL != io_handle->messages[packet]) &&
892           (socket->receiver_window_available 
893            >= ntohs (io_handle->messages[packet]->header.header.size)) &&
894           (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
895   {
896     socket->receiver_window_available -= 
897       ntohs (io_handle->messages[packet]->header.header.size);
898     LOG (GNUNET_ERROR_TYPE_DEBUG,
899          "%s: Placing DATA message with sequence %u in send queue\n",
900          GNUNET_i2s (&socket->other_peer),
901          ntohl (io_handle->messages[packet]->sequence_number));
902     copy_and_queue_message (socket,
903                             &io_handle->messages[packet]->header,
904                             NULL,
905                             NULL);
906     packet++;
907   }
908   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
909     socket->data_retransmission_task_id = 
910       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
911                                     (GNUNET_TIME_UNIT_SECONDS, 8),
912                                     &data_retransmission_task,
913                                     socket);
914 }
915
916
917 /**
918  * Task for calling the read processor
919  *
920  * @param cls the socket
921  * @param tc the task context
922  */
923 static void
924 call_read_processor (void *cls,
925                      const struct GNUNET_SCHEDULER_TaskContext *tc)
926 {
927   struct GNUNET_STREAM_Socket *socket = cls;
928   size_t read_size;
929   size_t valid_read_size;
930   unsigned int packet;
931   uint32_t sequence_increase;
932   uint32_t offset_increase;
933
934   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
935   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
936     return;
937
938   if (NULL == socket->receive_buffer) 
939     return;
940
941   GNUNET_assert (NULL != socket->read_handle);
942   GNUNET_assert (NULL != socket->read_handle->proc);
943
944   /* Check the bitmap for any holes */
945   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
946   {
947     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
948                                            packet))
949       break;
950   }
951   /* We only call read processor if we have the first packet */
952   GNUNET_assert (0 < packet);
953   valid_read_size = 
954     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
955   GNUNET_assert (0 != valid_read_size);
956   /* Cancel the read_io_timeout_task */
957   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
958   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
959   /* Call the data processor */
960   LOG (GNUNET_ERROR_TYPE_DEBUG,
961        "%s: Calling read processor\n",
962        GNUNET_i2s (&socket->other_peer));
963   read_size = 
964     socket->read_handle->proc (socket->read_handle->proc_cls,
965                                socket->status,
966                                socket->receive_buffer + socket->copy_offset,
967                                valid_read_size);
968   LOG (GNUNET_ERROR_TYPE_DEBUG,
969        "%s: Read processor read %d bytes\n",
970        GNUNET_i2s (&socket->other_peer), read_size);
971   LOG (GNUNET_ERROR_TYPE_DEBUG,
972        "%s: Read processor completed successfully\n",
973        GNUNET_i2s (&socket->other_peer));
974   /* Free the read handle */
975   GNUNET_free (socket->read_handle);
976   socket->read_handle = NULL;
977   GNUNET_assert (read_size <= valid_read_size);
978   socket->copy_offset += read_size;
979   /* Determine upto which packet we can remove from the buffer */
980   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
981   {
982     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
983     { packet++; break; }
984     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
985       break;
986   }
987
988   /* If no packets can be removed we can't move the buffer */
989   if (0 == packet) return;
990   sequence_increase = packet;
991   LOG (GNUNET_ERROR_TYPE_DEBUG,
992        "%s: Sequence increase after read processor completion: %u\n",
993        GNUNET_i2s (&socket->other_peer), sequence_increase);
994
995   /* Shift the data in the receive buffer */
996   socket->receive_buffer = 
997     memmove (socket->receive_buffer,
998              socket->receive_buffer 
999              + socket->receive_buffer_boundaries[sequence_increase-1],
1000              socket->receive_buffer_size
1001              - socket->receive_buffer_boundaries[sequence_increase-1]);
1002   /* Shift the bitmap */
1003   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1004   /* Set read_sequence_number */
1005   socket->read_sequence_number += sequence_increase;
1006   /* Set read_offset */
1007   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1008   socket->read_offset += offset_increase;
1009   /* Fix copy_offset */
1010   GNUNET_assert (offset_increase <= socket->copy_offset);
1011   socket->copy_offset -= offset_increase;
1012   /* Fix relative boundaries */
1013   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1014   {
1015     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
1016     {
1017       uint32_t ahead_buffer_boundary;
1018
1019       ahead_buffer_boundary = 
1020         socket->receive_buffer_boundaries[packet + sequence_increase];
1021       if (0 == ahead_buffer_boundary)
1022         socket->receive_buffer_boundaries[packet] = 0;
1023       else
1024       {
1025         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1026         socket->receive_buffer_boundaries[packet] = 
1027           ahead_buffer_boundary - offset_increase;
1028       }
1029     }
1030     else
1031       socket->receive_buffer_boundaries[packet] = 0;
1032   }
1033 }
1034
1035
1036 /**
1037  * Cancels the existing read io handle
1038  *
1039  * @param cls the closure from the SCHEDULER call
1040  * @param tc the task context
1041  */
1042 static void
1043 read_io_timeout (void *cls,
1044                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1045 {
1046   struct GNUNET_STREAM_Socket *socket = cls;
1047   GNUNET_STREAM_DataProcessor proc;
1048   void *proc_cls;
1049
1050   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1051   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1052   {
1053     LOG (GNUNET_ERROR_TYPE_DEBUG,
1054          "%s: Read task timedout - Cancelling it\n",
1055          GNUNET_i2s (&socket->other_peer));
1056     GNUNET_SCHEDULER_cancel (socket->read_task_id);
1057     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1058   }
1059   GNUNET_assert (NULL != socket->read_handle);
1060   proc = socket->read_handle->proc;
1061   proc_cls = socket->read_handle->proc_cls;
1062   GNUNET_free (socket->read_handle);
1063   socket->read_handle = NULL;
1064   /* Call the read processor to signal timeout */
1065   proc (proc_cls,
1066         GNUNET_STREAM_TIMEOUT,
1067         NULL,
1068         0);
1069 }
1070
1071
1072 /**
1073  * Handler for DATA messages; Same for both client and server
1074  *
1075  * @param socket the socket through which the ack was received
1076  * @param tunnel connection to the other end
1077  * @param sender who sent the message
1078  * @param msg the data message
1079  * @param atsi performance data for the connection
1080  * @return GNUNET_OK to keep the connection open,
1081  *         GNUNET_SYSERR to close it (signal serious error)
1082  */
1083 static int
1084 handle_data (struct GNUNET_STREAM_Socket *socket,
1085              struct GNUNET_MESH_Tunnel *tunnel,
1086              const struct GNUNET_PeerIdentity *sender,
1087              const struct GNUNET_STREAM_DataMessage *msg,
1088              const struct GNUNET_ATS_Information*atsi)
1089 {
1090   const void *payload;
1091   struct GNUNET_TIME_Relative ack_deadline_rel;
1092   uint32_t bytes_needed;
1093   uint32_t relative_offset;
1094   uint32_t relative_sequence_number;
1095   uint16_t size;
1096
1097   size = htons (msg->header.header.size);
1098   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1099   {
1100     GNUNET_break_op (0);
1101     return GNUNET_SYSERR;
1102   }
1103   if (0 != memcmp (sender, &socket->other_peer,
1104                    sizeof (struct GNUNET_PeerIdentity)))
1105   {
1106     LOG (GNUNET_ERROR_TYPE_DEBUG,
1107          "%s: Received DATA from non-confirming peer\n",
1108          GNUNET_i2s (&socket->other_peer));
1109     return GNUNET_YES;
1110   }
1111   switch (socket->state)
1112   {
1113   case STATE_ESTABLISHED:
1114   case STATE_TRANSMIT_CLOSED:
1115   case STATE_TRANSMIT_CLOSE_WAIT:      
1116     /* check if the message's sequence number is in the range we are
1117        expecting */
1118     relative_sequence_number = 
1119       ntohl (msg->sequence_number) - socket->read_sequence_number;
1120     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1121     {
1122       LOG (GNUNET_ERROR_TYPE_DEBUG,
1123            "%s: Ignoring received message with sequence number %u\n",
1124            GNUNET_i2s (&socket->other_peer),
1125            ntohl (msg->sequence_number));
1126       /* Start ACK sending task if one is not already present */
1127       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1128       {
1129         socket->ack_task_id = 
1130           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1131                                         (msg->ack_deadline),
1132                                         &ack_task,
1133                                         socket);
1134       }
1135       return GNUNET_YES;
1136     }      
1137     /* Check if we have already seen this message */
1138     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1139                                             relative_sequence_number))
1140     {
1141       LOG (GNUNET_ERROR_TYPE_DEBUG,
1142            "%s: Ignoring already received message with sequence number %u\n",
1143            GNUNET_i2s (&socket->other_peer),
1144            ntohl (msg->sequence_number));
1145       /* Start ACK sending task if one is not already present */
1146       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1147       {
1148         socket->ack_task_id = 
1149           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1150                                         (msg->ack_deadline), &ack_task, socket);
1151       }
1152       return GNUNET_YES;
1153     }
1154     LOG (GNUNET_ERROR_TYPE_DEBUG,
1155          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1156          GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1157          ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1158     /* Check if we have to allocate the buffer */
1159     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1160     relative_offset = ntohl (msg->offset) - socket->read_offset;
1161     bytes_needed = relative_offset + size;
1162     if (bytes_needed > socket->receive_buffer_size)
1163     {
1164       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1165       {
1166         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1167                                                  bytes_needed);
1168         socket->receive_buffer_size = bytes_needed;
1169       }
1170       else
1171       {
1172         LOG (GNUNET_ERROR_TYPE_DEBUG,
1173              "%s: Cannot accommodate packet %d as buffer is full\n",
1174              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1175         return GNUNET_YES;
1176       }
1177     }
1178     /* Copy Data to buffer */
1179     payload = &msg[1];
1180     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1181     memcpy (socket->receive_buffer + relative_offset, payload, size);
1182     socket->receive_buffer_boundaries[relative_sequence_number] = 
1183         relative_offset + size;
1184     /* Modify the ACK bitmap */
1185     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1186                           GNUNET_YES);
1187     /* Start ACK sending task if one is not already present */
1188     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1189     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1190     {
1191       ack_deadline_rel = 
1192           GNUNET_TIME_relative_min (ack_deadline_rel,
1193                                     GNUNET_TIME_relative_multiply
1194                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1195       socket->ack_task_id = 
1196           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1197                                         (msg->ack_deadline), &ack_task, socket);
1198       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1199       socket->ack_time_deadline = ack_deadline_rel;
1200     }
1201     else
1202     {
1203       struct GNUNET_TIME_Relative ack_time_past;
1204       struct GNUNET_TIME_Relative ack_time_remaining;
1205       struct GNUNET_TIME_Relative ack_time_min;
1206       ack_time_past = 
1207           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1208       ack_time_remaining = GNUNET_TIME_relative_subtract
1209           (socket->ack_time_deadline, ack_time_past);
1210       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1211                                                ack_deadline_rel);
1212       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1213                       sizeof (struct GNUNET_TIME_Relative)))
1214       {
1215         ack_deadline_rel = ack_time_min;
1216         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1217         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1218                                                             &ack_task, socket);
1219         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1220         socket->ack_time_deadline = ack_deadline_rel;
1221       }
1222     }
1223     if ((NULL != socket->read_handle) /* A read handle is waiting */
1224         /* There is no current read task */
1225         && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1226         /* We have the first packet */
1227         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1228     {
1229       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", 
1230            GNUNET_i2s (&socket->other_peer));          
1231       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
1232                                                        socket);
1233     }      
1234     break;
1235   default:
1236     LOG (GNUNET_ERROR_TYPE_DEBUG,
1237          "%s: Received data message when it cannot be handled\n",
1238          GNUNET_i2s (&socket->other_peer));
1239     break;
1240   }
1241   return GNUNET_YES;
1242 }
1243
1244
1245 /**
1246  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1247  *
1248  * @param cls the socket (set from GNUNET_MESH_connect)
1249  * @param tunnel connection to the other end
1250  * @param tunnel_ctx place to store local state associated with the tunnel
1251  * @param sender who sent the message
1252  * @param message the actual message
1253  * @param atsi performance data for the connection
1254  * @return GNUNET_OK to keep the connection open,
1255  *         GNUNET_SYSERR to close it (signal serious error)
1256  */
1257 static int
1258 client_handle_data (void *cls,
1259                     struct GNUNET_MESH_Tunnel *tunnel,
1260                     void **tunnel_ctx,
1261                     const struct GNUNET_PeerIdentity *sender,
1262                     const struct GNUNET_MessageHeader *message,
1263                     const struct GNUNET_ATS_Information*atsi)
1264 {
1265   struct GNUNET_STREAM_Socket *socket = cls;
1266
1267   return handle_data (socket, tunnel, sender, 
1268                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1269 }
1270
1271
1272 /**
1273  * Callback to set state to ESTABLISHED
1274  *
1275  * @param cls the closure NULL;
1276  * @param socket the socket to requiring state change
1277  */
1278 static void
1279 set_state_established (void *cls,
1280                        struct GNUNET_STREAM_Socket *socket)
1281 {
1282   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1283        "%s: Attaining ESTABLISHED state\n",
1284        GNUNET_i2s (&socket->other_peer));
1285   socket->write_offset = 0;
1286   socket->read_offset = 0;
1287   socket->state = STATE_ESTABLISHED;
1288   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1289                  socket->control_retransmission_task_id);
1290   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1291   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1292   if (NULL != socket->lsocket)
1293   {
1294     LOG (GNUNET_ERROR_TYPE_DEBUG,
1295          "%s: Calling listen callback\n",
1296          GNUNET_i2s (&socket->other_peer));
1297     if (GNUNET_SYSERR == 
1298         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1299                                     socket,
1300                                     &socket->other_peer))
1301     {
1302       socket->state = STATE_CLOSED;
1303       /* FIXME: We should close in a decent way (send RST) */
1304       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1305       GNUNET_free (socket);
1306     }
1307   }
1308   else
1309     socket->open_cb (socket->open_cls, socket);
1310 }
1311
1312
1313 /**
1314  * Callback to set state to HELLO_WAIT
1315  *
1316  * @param cls the closure from queue_message
1317  * @param socket the socket to requiring state change
1318  */
1319 static void
1320 set_state_hello_wait (void *cls,
1321                       struct GNUNET_STREAM_Socket *socket)
1322 {
1323   GNUNET_assert (STATE_INIT == socket->state);
1324   LOG (GNUNET_ERROR_TYPE_DEBUG,
1325        "%s: Attaining HELLO_WAIT state\n",
1326        GNUNET_i2s (&socket->other_peer));
1327   socket->state = STATE_HELLO_WAIT;
1328 }
1329
1330
1331 /**
1332  * Callback to set state to CLOSE_WAIT
1333  *
1334  * @param cls the closure from queue_message
1335  * @param socket the socket requiring state change
1336  */
1337 static void
1338 set_state_close_wait (void *cls,
1339                       struct GNUNET_STREAM_Socket *socket)
1340 {
1341   LOG (GNUNET_ERROR_TYPE_DEBUG,
1342        "%s: Attaing CLOSE_WAIT state\n",
1343        GNUNET_i2s (&socket->other_peer));
1344   socket->state = STATE_CLOSE_WAIT;
1345   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1346   socket->receive_buffer = NULL;
1347   socket->receive_buffer_size = 0;
1348 }
1349
1350
1351 /**
1352  * Callback to set state to RECEIVE_CLOSE_WAIT
1353  *
1354  * @param cls the closure from queue_message
1355  * @param socket the socket requiring state change
1356  */
1357 static void
1358 set_state_receive_close_wait (void *cls,
1359                               struct GNUNET_STREAM_Socket *socket)
1360 {
1361   LOG (GNUNET_ERROR_TYPE_DEBUG,
1362        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1363        GNUNET_i2s (&socket->other_peer));
1364   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1365   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1366   socket->receive_buffer = NULL;
1367   socket->receive_buffer_size = 0;
1368 }
1369
1370
1371 /**
1372  * Callback to set state to TRANSMIT_CLOSE_WAIT
1373  *
1374  * @param cls the closure from queue_message
1375  * @param socket the socket requiring state change
1376  */
1377 static void
1378 set_state_transmit_close_wait (void *cls,
1379                                struct GNUNET_STREAM_Socket *socket)
1380 {
1381   LOG (GNUNET_ERROR_TYPE_DEBUG,
1382        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1383        GNUNET_i2s (&socket->other_peer));
1384   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1385 }
1386
1387
1388 /**
1389  * Callback to set state to CLOSED
1390  *
1391  * @param cls the closure from queue_message
1392  * @param socket the socket requiring state change
1393  */
1394 static void
1395 set_state_closed (void *cls,
1396                   struct GNUNET_STREAM_Socket *socket)
1397 {
1398   socket->state = STATE_CLOSED;
1399 }
1400
1401
1402 /**
1403  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1404  *
1405  * @return the generate hello message
1406  */
1407 static struct GNUNET_STREAM_MessageHeader *
1408 generate_hello (void)
1409 {
1410   struct GNUNET_STREAM_MessageHeader *msg;
1411
1412   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1413   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1414   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1415   return msg;
1416 }
1417
1418
1419 /**
1420  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1421  * socket
1422  *
1423  * @param socket the socket for which this HelloAckMessage has to be generated
1424  * @param generate_seq GNUNET_YES to generate the write sequence number,
1425  *          GNUNET_NO to use the existing sequence number
1426  * @return the HelloAckMessage
1427  */
1428 static struct GNUNET_STREAM_HelloAckMessage *
1429 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1430                     int generate_seq)
1431 {
1432   struct GNUNET_STREAM_HelloAckMessage *msg;
1433
1434   if (GNUNET_YES == generate_seq)
1435   {
1436     if (GNUNET_YES == socket->testing_active)
1437       socket->write_sequence_number =
1438         socket->testing_set_write_sequence_number_value;
1439     else
1440       socket->write_sequence_number = 
1441         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1442     LOG_DEBUG ("%s: write sequence number %u\n",
1443                GNUNET_i2s (&socket->other_peer),
1444                (unsigned int) socket->write_sequence_number);
1445   }
1446   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1447   msg->header.header.size = 
1448     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1449   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1450   msg->sequence_number = htonl (socket->write_sequence_number);
1451   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1452   return msg;
1453 }
1454
1455
1456 /**
1457  * Task for retransmitting control messages if they aren't ACK'ed before a
1458  * deadline
1459  *
1460  * @param cls the socket
1461  * @param tc the Task context
1462  */
1463 static void
1464 control_retransmission_task (void *cls,
1465                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1466 {
1467   struct GNUNET_STREAM_Socket *socket = cls;
1468     
1469   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1470     return;
1471   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1472   LOG_DEBUG ("%s: Retransmitting a control message\n",
1473                  GNUNET_i2s (&socket->other_peer));
1474   switch (socket->state)
1475   {
1476   case STATE_INIT:    
1477     GNUNET_break (0);
1478     break;
1479   case STATE_LISTEN:
1480     GNUNET_break (0);
1481     break;
1482   case STATE_HELLO_WAIT:
1483     if (NULL == socket->lsocket) /* We are client */
1484       queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1485     else
1486       queue_message (socket,
1487                      (struct GNUNET_STREAM_MessageHeader *)
1488                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1489                      GNUNET_NO);
1490     socket->control_retransmission_task_id =
1491     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1492                                   &control_retransmission_task, socket);
1493     break;
1494   case STATE_ESTABLISHED:
1495     if (NULL == socket->lsocket)
1496       queue_message (socket,
1497                      (struct GNUNET_STREAM_MessageHeader *)
1498                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1499                      GNUNET_NO);
1500     else
1501       GNUNET_break (0);
1502   default:
1503     GNUNET_break (0);
1504   }  
1505 }
1506
1507
1508 /**
1509  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1510  *
1511  * @param cls the socket (set from GNUNET_MESH_connect)
1512  * @param tunnel connection to the other end
1513  * @param tunnel_ctx this is NULL
1514  * @param sender who sent the message
1515  * @param message the actual message
1516  * @param atsi performance data for the connection
1517  * @return GNUNET_OK to keep the connection open,
1518  *         GNUNET_SYSERR to close it (signal serious error)
1519  */
1520 static int
1521 client_handle_hello_ack (void *cls,
1522                          struct GNUNET_MESH_Tunnel *tunnel,
1523                          void **tunnel_ctx,
1524                          const struct GNUNET_PeerIdentity *sender,
1525                          const struct GNUNET_MessageHeader *message,
1526                          const struct GNUNET_ATS_Information*atsi)
1527 {
1528   struct GNUNET_STREAM_Socket *socket = cls;
1529   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1530   struct GNUNET_STREAM_HelloAckMessage *reply;
1531
1532   if (0 != memcmp (sender, &socket->other_peer,
1533                    sizeof (struct GNUNET_PeerIdentity)))
1534   {
1535     LOG (GNUNET_ERROR_TYPE_DEBUG,
1536          "%s: Received HELLO_ACK from non-confirming peer\n",
1537          GNUNET_i2s (&socket->other_peer));
1538     return GNUNET_YES;
1539   }
1540   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1541   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1542        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1543   GNUNET_assert (socket->tunnel == tunnel);
1544   switch (socket->state)
1545   {
1546   case STATE_HELLO_WAIT:
1547     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1548     LOG (GNUNET_ERROR_TYPE_DEBUG,
1549          "%s: Read sequence number %u\n",
1550          GNUNET_i2s (&socket->other_peer),
1551          (unsigned int) socket->read_sequence_number);
1552     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1553     reply = generate_hello_ack (socket, GNUNET_YES);
1554     queue_message (socket, &reply->header, &set_state_established,
1555                    NULL, GNUNET_NO);    
1556     return GNUNET_OK;
1557   case STATE_ESTABLISHED:
1558     // call statistics (# ACKs ignored++)
1559     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1560                    socket->control_retransmission_task_id);
1561     socket->control_retransmission_task_id =
1562       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1563     return GNUNET_OK;
1564   default:
1565     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
1566                GNUNET_i2s (&socket->other_peer),
1567                GNUNET_i2s (&socket->other_peer), socket->state);
1568     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1569     return GNUNET_SYSERR;
1570   }
1571 }
1572
1573
1574 /**
1575  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1576  *
1577  * @param cls the socket (set from GNUNET_MESH_connect)
1578  * @param tunnel connection to the other end
1579  * @param tunnel_ctx this is NULL
1580  * @param sender who sent the message
1581  * @param message the actual message
1582  * @param atsi performance data for the connection
1583  * @return GNUNET_OK to keep the connection open,
1584  *         GNUNET_SYSERR to close it (signal serious error)
1585  */
1586 static int
1587 client_handle_reset (void *cls,
1588                      struct GNUNET_MESH_Tunnel *tunnel,
1589                      void **tunnel_ctx,
1590                      const struct GNUNET_PeerIdentity *sender,
1591                      const struct GNUNET_MessageHeader *message,
1592                      const struct GNUNET_ATS_Information*atsi)
1593 {
1594   // struct GNUNET_STREAM_Socket *socket = cls;
1595
1596   return GNUNET_OK;
1597 }
1598
1599
1600 /**
1601  * Common message handler for handling TRANSMIT_CLOSE messages
1602  *
1603  * @param socket the socket through which the ack was received
1604  * @param tunnel connection to the other end
1605  * @param sender who sent the message
1606  * @param msg the transmit close message
1607  * @param atsi performance data for the connection
1608  * @return GNUNET_OK to keep the connection open,
1609  *         GNUNET_SYSERR to close it (signal serious error)
1610  */
1611 static int
1612 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1613                        struct GNUNET_MESH_Tunnel *tunnel,
1614                        const struct GNUNET_PeerIdentity *sender,
1615                        const struct GNUNET_STREAM_MessageHeader *msg,
1616                        const struct GNUNET_ATS_Information*atsi)
1617 {
1618   struct GNUNET_STREAM_MessageHeader *reply;
1619
1620   switch (socket->state)
1621   {
1622   case STATE_ESTABLISHED:
1623     socket->state = STATE_RECEIVE_CLOSED;
1624     /* Send TRANSMIT_CLOSE_ACK */
1625     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1626     reply->header.type = 
1627       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1628     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1629     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1630     break;
1631   default:
1632     /* FIXME: Call statistics? */
1633     break;
1634   }
1635   return GNUNET_YES;
1636 }
1637
1638
1639 /**
1640  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1641  *
1642  * @param cls the socket (set from GNUNET_MESH_connect)
1643  * @param tunnel connection to the other end
1644  * @param tunnel_ctx this is NULL
1645  * @param sender who sent the message
1646  * @param message the actual message
1647  * @param atsi performance data for the connection
1648  * @return GNUNET_OK to keep the connection open,
1649  *         GNUNET_SYSERR to close it (signal serious error)
1650  */
1651 static int
1652 client_handle_transmit_close (void *cls,
1653                               struct GNUNET_MESH_Tunnel *tunnel,
1654                               void **tunnel_ctx,
1655                               const struct GNUNET_PeerIdentity *sender,
1656                               const struct GNUNET_MessageHeader *message,
1657                               const struct GNUNET_ATS_Information*atsi)
1658 {
1659   struct GNUNET_STREAM_Socket *socket = cls;
1660   
1661   return handle_transmit_close (socket,
1662                                 tunnel,
1663                                 sender,
1664                                 (struct GNUNET_STREAM_MessageHeader *)message,
1665                                 atsi);
1666 }
1667
1668
1669 /**
1670  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1671  *
1672  * @param socket the socket
1673  * @param tunnel connection to the other end
1674  * @param sender who sent the message
1675  * @param message the actual message
1676  * @param atsi performance data for the connection
1677  * @param operation the close operation which is being ACK'ed
1678  * @return GNUNET_OK to keep the connection open,
1679  *         GNUNET_SYSERR to close it (signal serious error)
1680  */
1681 static int
1682 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1683                           struct GNUNET_MESH_Tunnel *tunnel,
1684                           const struct GNUNET_PeerIdentity *sender,
1685                           const struct GNUNET_STREAM_MessageHeader *message,
1686                           const struct GNUNET_ATS_Information *atsi,
1687                           int operation)
1688 {
1689   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1690
1691   shutdown_handle = socket->shutdown_handle;
1692   if (NULL == shutdown_handle)
1693   {
1694     LOG (GNUNET_ERROR_TYPE_DEBUG,
1695          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1696          GNUNET_i2s (&socket->other_peer));
1697     return GNUNET_OK;
1698   }
1699   switch (operation)
1700   {
1701   case SHUT_RDWR:
1702     switch (socket->state)
1703     {
1704     case STATE_CLOSE_WAIT:
1705       if (SHUT_RDWR != shutdown_handle->operation)
1706       {
1707         LOG (GNUNET_ERROR_TYPE_DEBUG,
1708              "%s: Received CLOSE_ACK when shutdown handle is not for "
1709              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1710         return GNUNET_OK;
1711       }
1712       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1713            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1714       socket->state = STATE_CLOSED;
1715       break;
1716     default:
1717       LOG (GNUNET_ERROR_TYPE_DEBUG,
1718            "%s: Received CLOSE_ACK when in it not expected\n",
1719            GNUNET_i2s (&socket->other_peer));
1720       return GNUNET_OK;
1721     }
1722     break;
1723   case SHUT_RD:
1724     switch (socket->state)
1725     {
1726     case STATE_RECEIVE_CLOSE_WAIT:
1727       if (SHUT_RD != shutdown_handle->operation)
1728       {
1729         LOG (GNUNET_ERROR_TYPE_DEBUG,
1730              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1731              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1732         return GNUNET_OK;
1733       }
1734       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1735            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1736       socket->state = STATE_RECEIVE_CLOSED;
1737       break;
1738     default:
1739       LOG (GNUNET_ERROR_TYPE_DEBUG,
1740            "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1741            GNUNET_i2s (&socket->other_peer));
1742       return GNUNET_OK;
1743     }
1744     break;
1745   case SHUT_WR:
1746     switch (socket->state)
1747     {
1748     case STATE_TRANSMIT_CLOSE_WAIT:
1749       if (SHUT_WR != shutdown_handle->operation)
1750       {
1751         LOG (GNUNET_ERROR_TYPE_DEBUG,
1752              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1753              "is not for SHUT_WR\n",
1754              GNUNET_i2s (&socket->other_peer));
1755         return GNUNET_OK;
1756       }
1757       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1758            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1759       socket->state = STATE_TRANSMIT_CLOSED;
1760       break;
1761     default:
1762       LOG (GNUNET_ERROR_TYPE_DEBUG,
1763            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1764            GNUNET_i2s (&socket->other_peer));          
1765       return GNUNET_OK;
1766     }
1767     break;
1768   default:
1769     GNUNET_assert (0);
1770   }
1771   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1772     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1773                                    operation);
1774   if (GNUNET_SCHEDULER_NO_TASK
1775       != shutdown_handle->close_msg_retransmission_task_id)
1776   {
1777     GNUNET_SCHEDULER_cancel
1778       (shutdown_handle->close_msg_retransmission_task_id);
1779     shutdown_handle->close_msg_retransmission_task_id =
1780         GNUNET_SCHEDULER_NO_TASK;
1781   }
1782   GNUNET_free (shutdown_handle); /* Free shutdown handle */
1783   socket->shutdown_handle = NULL;
1784   return GNUNET_OK;
1785 }
1786
1787
1788 /**
1789  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1790  *
1791  * @param cls the socket (set from GNUNET_MESH_connect)
1792  * @param tunnel connection to the other end
1793  * @param tunnel_ctx this is NULL
1794  * @param sender who sent the message
1795  * @param message the actual message
1796  * @param atsi performance data for the connection
1797  * @return GNUNET_OK to keep the connection open,
1798  *         GNUNET_SYSERR to close it (signal serious error)
1799  */
1800 static int
1801 client_handle_transmit_close_ack (void *cls,
1802                                   struct GNUNET_MESH_Tunnel *tunnel,
1803                                   void **tunnel_ctx,
1804                                   const struct GNUNET_PeerIdentity *sender,
1805                                   const struct GNUNET_MessageHeader *message,
1806                                   const struct GNUNET_ATS_Information*atsi)
1807 {
1808   struct GNUNET_STREAM_Socket *socket = cls;
1809
1810   return handle_generic_close_ack (socket,
1811                                    tunnel,
1812                                    sender,
1813                                    (const struct GNUNET_STREAM_MessageHeader *)
1814                                    message,
1815                                    atsi,
1816                                    SHUT_WR);
1817 }
1818
1819
1820 /**
1821  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1822  *
1823  * @param socket the socket
1824  * @param tunnel connection to the other end
1825  * @param sender who sent the message
1826  * @param message the actual message
1827  * @param atsi performance data for the connection
1828  * @return GNUNET_OK to keep the connection open,
1829  *         GNUNET_SYSERR to close it (signal serious error)
1830  */
1831 static int
1832 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1833                       struct GNUNET_MESH_Tunnel *tunnel,
1834                       const struct GNUNET_PeerIdentity *sender,
1835                       const struct GNUNET_STREAM_MessageHeader *message,
1836                       const struct GNUNET_ATS_Information *atsi)
1837 {
1838   struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1839
1840   switch (socket->state)
1841   {
1842   case STATE_INIT:
1843   case STATE_LISTEN:
1844   case STATE_HELLO_WAIT:
1845     LOG (GNUNET_ERROR_TYPE_DEBUG,
1846          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1847          GNUNET_i2s (&socket->other_peer));
1848     return GNUNET_OK;
1849   default:
1850     break;
1851   }
1852   
1853   LOG (GNUNET_ERROR_TYPE_DEBUG,
1854        "%s: Received RECEIVE_CLOSE from %s\n",
1855        GNUNET_i2s (&socket->other_peer),
1856        GNUNET_i2s (&socket->other_peer));
1857   receive_close_ack =
1858     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1859   receive_close_ack->header.size =
1860     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1861   receive_close_ack->header.type =
1862     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1863   queue_message (socket, receive_close_ack, &set_state_closed,
1864                  NULL, GNUNET_NO);  
1865   /* FIXME: Handle the case where write handle is present; the write operation
1866      should be deemed as finised and the write continuation callback
1867      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1868   return GNUNET_OK;
1869 }
1870
1871
1872 /**
1873  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1874  *
1875  * @param cls the socket (set from GNUNET_MESH_connect)
1876  * @param tunnel connection to the other end
1877  * @param tunnel_ctx this is NULL
1878  * @param sender who sent the message
1879  * @param message the actual message
1880  * @param atsi performance data for the connection
1881  * @return GNUNET_OK to keep the connection open,
1882  *         GNUNET_SYSERR to close it (signal serious error)
1883  */
1884 static int
1885 client_handle_receive_close (void *cls,
1886                              struct GNUNET_MESH_Tunnel *tunnel,
1887                              void **tunnel_ctx,
1888                              const struct GNUNET_PeerIdentity *sender,
1889                              const struct GNUNET_MessageHeader *message,
1890                              const struct GNUNET_ATS_Information*atsi)
1891 {
1892   struct GNUNET_STREAM_Socket *socket = cls;
1893
1894   return
1895     handle_receive_close (socket,
1896                           tunnel,
1897                           sender,
1898                           (const struct GNUNET_STREAM_MessageHeader *) message,
1899                           atsi);
1900 }
1901
1902
1903 /**
1904  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1905  *
1906  * @param cls the socket (set from GNUNET_MESH_connect)
1907  * @param tunnel connection to the other end
1908  * @param tunnel_ctx this is NULL
1909  * @param sender who sent the message
1910  * @param message the actual message
1911  * @param atsi performance data for the connection
1912  * @return GNUNET_OK to keep the connection open,
1913  *         GNUNET_SYSERR to close it (signal serious error)
1914  */
1915 static int
1916 client_handle_receive_close_ack (void *cls,
1917                                  struct GNUNET_MESH_Tunnel *tunnel,
1918                                  void **tunnel_ctx,
1919                                  const struct GNUNET_PeerIdentity *sender,
1920                                  const struct GNUNET_MessageHeader *message,
1921                                  const struct GNUNET_ATS_Information*atsi)
1922 {
1923   struct GNUNET_STREAM_Socket *socket = cls;
1924
1925   return handle_generic_close_ack (socket,
1926                                    tunnel,
1927                                    sender,
1928                                    (const struct GNUNET_STREAM_MessageHeader *)
1929                                    message,
1930                                    atsi,
1931                                    SHUT_RD);
1932 }
1933
1934
1935 /**
1936  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1937  *
1938  * @param socket the socket
1939  * @param tunnel connection to the other end
1940  * @param sender who sent the message
1941  * @param message the actual message
1942  * @param atsi performance data for the connection
1943  * @return GNUNET_OK to keep the connection open,
1944  *         GNUNET_SYSERR to close it (signal serious error)
1945  */
1946 static int
1947 handle_close (struct GNUNET_STREAM_Socket *socket,
1948               struct GNUNET_MESH_Tunnel *tunnel,
1949               const struct GNUNET_PeerIdentity *sender,
1950               const struct GNUNET_STREAM_MessageHeader *message,
1951               const struct GNUNET_ATS_Information*atsi)
1952 {
1953   struct GNUNET_STREAM_MessageHeader *close_ack;
1954
1955   switch (socket->state)
1956   {
1957   case STATE_INIT:
1958   case STATE_LISTEN:
1959   case STATE_HELLO_WAIT:
1960     LOG (GNUNET_ERROR_TYPE_DEBUG,
1961          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1962          GNUNET_i2s (&socket->other_peer));
1963     return GNUNET_OK;
1964   default:
1965     break;
1966   }
1967
1968   LOG (GNUNET_ERROR_TYPE_DEBUG,
1969        "%s: Received CLOSE from %s\n",
1970        GNUNET_i2s (&socket->other_peer),
1971        GNUNET_i2s (&socket->other_peer));
1972   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1973   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1974   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
1975   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
1976   if (socket->state == STATE_CLOSED)
1977     return GNUNET_OK;
1978
1979   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1980   socket->receive_buffer = NULL;
1981   socket->receive_buffer_size = 0;
1982   return GNUNET_OK;
1983 }
1984
1985
1986 /**
1987  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1988  *
1989  * @param cls the socket (set from GNUNET_MESH_connect)
1990  * @param tunnel connection to the other end
1991  * @param tunnel_ctx this is NULL
1992  * @param sender who sent the message
1993  * @param message the actual message
1994  * @param atsi performance data for the connection
1995  * @return GNUNET_OK to keep the connection open,
1996  *         GNUNET_SYSERR to close it (signal serious error)
1997  */
1998 static int
1999 client_handle_close (void *cls,
2000                      struct GNUNET_MESH_Tunnel *tunnel,
2001                      void **tunnel_ctx,
2002                      const struct GNUNET_PeerIdentity *sender,
2003                      const struct GNUNET_MessageHeader *message,
2004                      const struct GNUNET_ATS_Information*atsi)
2005 {
2006   struct GNUNET_STREAM_Socket *socket = cls;
2007
2008   return handle_close (socket,
2009                        tunnel,
2010                        sender,
2011                        (const struct GNUNET_STREAM_MessageHeader *) message,
2012                        atsi);
2013 }
2014
2015
2016 /**
2017  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2018  *
2019  * @param cls the socket (set from GNUNET_MESH_connect)
2020  * @param tunnel connection to the other end
2021  * @param tunnel_ctx this is NULL
2022  * @param sender who sent the message
2023  * @param message the actual message
2024  * @param atsi performance data for the connection
2025  * @return GNUNET_OK to keep the connection open,
2026  *         GNUNET_SYSERR to close it (signal serious error)
2027  */
2028 static int
2029 client_handle_close_ack (void *cls,
2030                          struct GNUNET_MESH_Tunnel *tunnel,
2031                          void **tunnel_ctx,
2032                          const struct GNUNET_PeerIdentity *sender,
2033                          const struct GNUNET_MessageHeader *message,
2034                          const struct GNUNET_ATS_Information *atsi)
2035 {
2036   struct GNUNET_STREAM_Socket *socket = cls;
2037
2038   return handle_generic_close_ack (socket,
2039                                    tunnel,
2040                                    sender,
2041                                    (const struct GNUNET_STREAM_MessageHeader *) 
2042                                    message,
2043                                    atsi,
2044                                    SHUT_RDWR);
2045 }
2046
2047 /*****************************/
2048 /* Server's Message Handlers */
2049 /*****************************/
2050
2051 /**
2052  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2053  *
2054  * @param cls the closure
2055  * @param tunnel connection to the other end
2056  * @param tunnel_ctx the socket
2057  * @param sender who sent the message
2058  * @param message the actual message
2059  * @param atsi performance data for the connection
2060  * @return GNUNET_OK to keep the connection open,
2061  *         GNUNET_SYSERR to close it (signal serious error)
2062  */
2063 static int
2064 server_handle_data (void *cls,
2065                     struct GNUNET_MESH_Tunnel *tunnel,
2066                     void **tunnel_ctx,
2067                     const struct GNUNET_PeerIdentity *sender,
2068                     const struct GNUNET_MessageHeader *message,
2069                     const struct GNUNET_ATS_Information*atsi)
2070 {
2071   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2072
2073   return handle_data (socket,
2074                       tunnel,
2075                       sender,
2076                       (const struct GNUNET_STREAM_DataMessage *)message,
2077                       atsi);
2078 }
2079
2080
2081 /**
2082  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2083  *
2084  * @param cls the closure
2085  * @param tunnel connection to the other end
2086  * @param tunnel_ctx the socket
2087  * @param sender who sent the message
2088  * @param message the actual message
2089  * @param atsi performance data for the connection
2090  * @return GNUNET_OK to keep the connection open,
2091  *         GNUNET_SYSERR to close it (signal serious error)
2092  */
2093 static int
2094 server_handle_hello (void *cls,
2095                      struct GNUNET_MESH_Tunnel *tunnel,
2096                      void **tunnel_ctx,
2097                      const struct GNUNET_PeerIdentity *sender,
2098                      const struct GNUNET_MessageHeader *message,
2099                      const struct GNUNET_ATS_Information*atsi)
2100 {
2101   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2102   struct GNUNET_STREAM_HelloAckMessage *reply;
2103
2104   if (0 != memcmp (sender,
2105                    &socket->other_peer,
2106                    sizeof (struct GNUNET_PeerIdentity)))
2107   {
2108     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2109                GNUNET_i2s (&socket->other_peer));
2110     return GNUNET_YES;
2111   }
2112   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2113   GNUNET_assert (socket->tunnel == tunnel);
2114   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
2115              GNUNET_i2s (&socket->other_peer));
2116   switch (socket->status)
2117   {
2118   case STATE_INIT:
2119     reply = generate_hello_ack (socket, GNUNET_YES);
2120     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2121                    GNUNET_NO);
2122     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2123                    socket->control_retransmission_task_id);
2124     socket->control_retransmission_task_id =
2125       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2126                                     &control_retransmission_task, socket);
2127     break;
2128   case STATE_HELLO_WAIT:
2129     /* Perhaps our HELLO_ACK was lost */
2130     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2131                    socket->control_retransmission_task_id);
2132     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2133     socket->control_retransmission_task_id =
2134       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2135     break;
2136   default:
2137     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2138                GNUNET_i2s (&socket->other_peer), socket->state);
2139     /* FIXME: Send RESET? */
2140   }
2141   return GNUNET_OK;
2142 }
2143
2144
2145 /**
2146  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2147  *
2148  * @param cls the closure
2149  * @param tunnel connection to the other end
2150  * @param tunnel_ctx the socket
2151  * @param sender who sent the message
2152  * @param message the actual message
2153  * @param atsi performance data for the connection
2154  * @return GNUNET_OK to keep the connection open,
2155  *         GNUNET_SYSERR to close it (signal serious error)
2156  */
2157 static int
2158 server_handle_hello_ack (void *cls,
2159                          struct GNUNET_MESH_Tunnel *tunnel,
2160                          void **tunnel_ctx,
2161                          const struct GNUNET_PeerIdentity *sender,
2162                          const struct GNUNET_MessageHeader *message,
2163                          const struct GNUNET_ATS_Information*atsi)
2164 {
2165   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2166   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2167
2168   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2169                  ntohs (message->type));
2170   GNUNET_assert (socket->tunnel == tunnel);
2171   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2172   switch (socket->state)  
2173   {
2174   case STATE_HELLO_WAIT:
2175     LOG (GNUNET_ERROR_TYPE_DEBUG,
2176          "%s: Received HELLO_ACK from %s\n",
2177          GNUNET_i2s (&socket->other_peer),
2178          GNUNET_i2s (&socket->other_peer));
2179     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2180     LOG (GNUNET_ERROR_TYPE_DEBUG,
2181          "%s: Read sequence number %u\n",
2182          GNUNET_i2s (&socket->other_peer),
2183          (unsigned int) socket->read_sequence_number);
2184     socket->receiver_window_available = 
2185       ntohl (ack_message->receiver_window_size);
2186     set_state_established (NULL, socket);
2187     break;
2188   default:
2189     LOG (GNUNET_ERROR_TYPE_DEBUG,
2190          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2191   }
2192   return GNUNET_OK;
2193 }
2194
2195
2196 /**
2197  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2198  *
2199  * @param cls the closure
2200  * @param tunnel connection to the other end
2201  * @param tunnel_ctx the socket
2202  * @param sender who sent the message
2203  * @param message the actual message
2204  * @param atsi performance data for the connection
2205  * @return GNUNET_OK to keep the connection open,
2206  *         GNUNET_SYSERR to close it (signal serious error)
2207  */
2208 static int
2209 server_handle_reset (void *cls,
2210                      struct GNUNET_MESH_Tunnel *tunnel,
2211                      void **tunnel_ctx,
2212                      const struct GNUNET_PeerIdentity *sender,
2213                      const struct GNUNET_MessageHeader *message,
2214                      const struct GNUNET_ATS_Information*atsi)
2215 {
2216   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2217
2218   return GNUNET_OK;
2219 }
2220
2221
2222 /**
2223  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2224  *
2225  * @param cls the closure
2226  * @param tunnel connection to the other end
2227  * @param tunnel_ctx the socket
2228  * @param sender who sent the message
2229  * @param message the actual message
2230  * @param atsi performance data for the connection
2231  * @return GNUNET_OK to keep the connection open,
2232  *         GNUNET_SYSERR to close it (signal serious error)
2233  */
2234 static int
2235 server_handle_transmit_close (void *cls,
2236                               struct GNUNET_MESH_Tunnel *tunnel,
2237                               void **tunnel_ctx,
2238                               const struct GNUNET_PeerIdentity *sender,
2239                               const struct GNUNET_MessageHeader *message,
2240                               const struct GNUNET_ATS_Information*atsi)
2241 {
2242   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2243
2244   return handle_transmit_close (socket,
2245                                 tunnel,
2246                                 sender,
2247                                 (struct GNUNET_STREAM_MessageHeader *)message,
2248                                 atsi);
2249 }
2250
2251
2252 /**
2253  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2254  *
2255  * @param cls the closure
2256  * @param tunnel connection to the other end
2257  * @param tunnel_ctx the socket
2258  * @param sender who sent the message
2259  * @param message the actual message
2260  * @param atsi performance data for the connection
2261  * @return GNUNET_OK to keep the connection open,
2262  *         GNUNET_SYSERR to close it (signal serious error)
2263  */
2264 static int
2265 server_handle_transmit_close_ack (void *cls,
2266                                   struct GNUNET_MESH_Tunnel *tunnel,
2267                                   void **tunnel_ctx,
2268                                   const struct GNUNET_PeerIdentity *sender,
2269                                   const struct GNUNET_MessageHeader *message,
2270                                   const struct GNUNET_ATS_Information*atsi)
2271 {
2272   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2273
2274   return handle_generic_close_ack (socket,
2275                                    tunnel,
2276                                    sender,
2277                                    (const struct GNUNET_STREAM_MessageHeader *)
2278                                    message,
2279                                    atsi,
2280                                    SHUT_WR);
2281 }
2282
2283
2284 /**
2285  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2286  *
2287  * @param cls the closure
2288  * @param tunnel connection to the other end
2289  * @param tunnel_ctx the socket
2290  * @param sender who sent the message
2291  * @param message the actual message
2292  * @param atsi performance data for the connection
2293  * @return GNUNET_OK to keep the connection open,
2294  *         GNUNET_SYSERR to close it (signal serious error)
2295  */
2296 static int
2297 server_handle_receive_close (void *cls,
2298                              struct GNUNET_MESH_Tunnel *tunnel,
2299                              void **tunnel_ctx,
2300                              const struct GNUNET_PeerIdentity *sender,
2301                              const struct GNUNET_MessageHeader *message,
2302                              const struct GNUNET_ATS_Information*atsi)
2303 {
2304   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2305
2306   return
2307     handle_receive_close (socket,
2308                           tunnel,
2309                           sender,
2310                           (const struct GNUNET_STREAM_MessageHeader *) message,
2311                           atsi);
2312 }
2313
2314
2315 /**
2316  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2317  *
2318  * @param cls the closure
2319  * @param tunnel connection to the other end
2320  * @param tunnel_ctx the socket
2321  * @param sender who sent the message
2322  * @param message the actual message
2323  * @param atsi performance data for the connection
2324  * @return GNUNET_OK to keep the connection open,
2325  *         GNUNET_SYSERR to close it (signal serious error)
2326  */
2327 static int
2328 server_handle_receive_close_ack (void *cls,
2329                                  struct GNUNET_MESH_Tunnel *tunnel,
2330                                  void **tunnel_ctx,
2331                                  const struct GNUNET_PeerIdentity *sender,
2332                                  const struct GNUNET_MessageHeader *message,
2333                                  const struct GNUNET_ATS_Information*atsi)
2334 {
2335   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2336
2337   return handle_generic_close_ack (socket,
2338                                    tunnel,
2339                                    sender,
2340                                    (const struct GNUNET_STREAM_MessageHeader *)
2341                                    message,
2342                                    atsi,
2343                                    SHUT_RD);
2344 }
2345
2346
2347 /**
2348  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2349  *
2350  * @param cls the listen socket (from GNUNET_MESH_connect in
2351  *          GNUNET_STREAM_listen) 
2352  * @param tunnel connection to the other end
2353  * @param tunnel_ctx the socket
2354  * @param sender who sent the message
2355  * @param message the actual message
2356  * @param atsi performance data for the connection
2357  * @return GNUNET_OK to keep the connection open,
2358  *         GNUNET_SYSERR to close it (signal serious error)
2359  */
2360 static int
2361 server_handle_close (void *cls,
2362                      struct GNUNET_MESH_Tunnel *tunnel,
2363                      void **tunnel_ctx,
2364                      const struct GNUNET_PeerIdentity *sender,
2365                      const struct GNUNET_MessageHeader *message,
2366                      const struct GNUNET_ATS_Information*atsi)
2367 {
2368   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2369   
2370   return handle_close (socket,
2371                        tunnel,
2372                        sender,
2373                        (const struct GNUNET_STREAM_MessageHeader *) message,
2374                        atsi);
2375 }
2376
2377
2378 /**
2379  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2380  *
2381  * @param cls the closure
2382  * @param tunnel connection to the other end
2383  * @param tunnel_ctx the socket
2384  * @param sender who sent the message
2385  * @param message the actual message
2386  * @param atsi performance data for the connection
2387  * @return GNUNET_OK to keep the connection open,
2388  *         GNUNET_SYSERR to close it (signal serious error)
2389  */
2390 static int
2391 server_handle_close_ack (void *cls,
2392                          struct GNUNET_MESH_Tunnel *tunnel,
2393                          void **tunnel_ctx,
2394                          const struct GNUNET_PeerIdentity *sender,
2395                          const struct GNUNET_MessageHeader *message,
2396                          const struct GNUNET_ATS_Information*atsi)
2397 {
2398   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2399
2400   return handle_generic_close_ack (socket,
2401                                    tunnel,
2402                                    sender,
2403                                    (const struct GNUNET_STREAM_MessageHeader *) 
2404                                    message,
2405                                    atsi,
2406                                    SHUT_RDWR);
2407 }
2408
2409
2410 /**
2411  * Handler for DATA_ACK messages
2412  *
2413  * @param socket the socket through which the ack was received
2414  * @param tunnel connection to the other end
2415  * @param sender who sent the message
2416  * @param ack the acknowledgment message
2417  * @param atsi performance data for the connection
2418  * @return GNUNET_OK to keep the connection open,
2419  *         GNUNET_SYSERR to close it (signal serious error)
2420  */
2421 static int
2422 handle_ack (struct GNUNET_STREAM_Socket *socket,
2423             struct GNUNET_MESH_Tunnel *tunnel,
2424             const struct GNUNET_PeerIdentity *sender,
2425             const struct GNUNET_STREAM_AckMessage *ack,
2426             const struct GNUNET_ATS_Information*atsi)
2427 {
2428   unsigned int packet;
2429   int need_retransmission;
2430   uint32_t sequence_difference;
2431   
2432   if (0 != memcmp (sender,
2433                    &socket->other_peer,
2434                    sizeof (struct GNUNET_PeerIdentity)))
2435   {
2436     LOG (GNUNET_ERROR_TYPE_DEBUG,
2437          "%s: Received ACK from non-confirming peer\n",
2438          GNUNET_i2s (&socket->other_peer));
2439     return GNUNET_YES;
2440   }
2441   switch (socket->state)
2442   {
2443   case (STATE_ESTABLISHED):
2444   case (STATE_RECEIVE_CLOSED):
2445   case (STATE_RECEIVE_CLOSE_WAIT):
2446     if (NULL == socket->write_handle)
2447     {
2448       LOG (GNUNET_ERROR_TYPE_DEBUG,
2449            "%s: Received DATA_ACK when write_handle is NULL\n",
2450            GNUNET_i2s (&socket->other_peer));
2451       return GNUNET_OK;
2452     }
2453     sequence_difference = 
2454         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2455     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2456     {
2457       LOG (GNUNET_ERROR_TYPE_DEBUG,
2458            "%s: Received DATA_ACK with unexpected base sequence number\n",
2459            GNUNET_i2s (&socket->other_peer));
2460       LOG (GNUNET_ERROR_TYPE_DEBUG,
2461            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2462            GNUNET_i2s (&socket->other_peer),
2463            socket->write_sequence_number,
2464            ntohl (ack->base_sequence_number));
2465       return GNUNET_OK;
2466     }
2467     /* FIXME: include the case when write_handle is cancelled - ignore the 
2468        acks */
2469     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2470          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2471     /* Cancel the retransmission task */
2472     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2473     {
2474       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2475       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2476     }
2477     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2478     {
2479       if (NULL == socket->write_handle->messages[packet]) break;
2480       /* BS: Base sequence from ack; PS: sequence num of current packet */
2481       sequence_difference = ntohl (ack->base_sequence_number)
2482         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2483       if ((0 == sequence_difference) ||
2484           (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
2485         continue; /* The message in our handle is not yet received */
2486       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2487       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2488       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2489                             packet, GNUNET_YES);
2490     }
2491     /* Update the receive window remaining
2492        FIXME : Should update with the value from a data ack with greater
2493        sequence number */
2494     socket->receiver_window_available = 
2495       ntohl (ack->receive_window_remaining);
2496     /* Check if we have received all acknowledgements */
2497     need_retransmission = GNUNET_NO;
2498     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2499     {
2500       if (NULL == socket->write_handle->messages[packet]) break;
2501       if (GNUNET_YES != ackbitmap_is_bit_set 
2502           (&socket->write_handle->ack_bitmap,packet))
2503       {
2504         need_retransmission = GNUNET_YES;
2505         break;
2506       }
2507     }
2508     if (GNUNET_YES == need_retransmission)
2509     {
2510       write_data (socket);
2511     }
2512     else      /* We have to call the write continuation callback now */
2513     {
2514       struct GNUNET_STREAM_IOWriteHandle *write_handle;
2515       
2516       /* Free the packets */
2517       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2518       {
2519         GNUNET_free_non_null (socket->write_handle->messages[packet]);
2520       }
2521       write_handle = socket->write_handle;
2522       socket->write_handle = NULL;
2523       if (NULL != write_handle->write_cont)
2524         write_handle->write_cont (write_handle->write_cont_cls,
2525                                   socket->status,
2526                                   write_handle->size);
2527       /* We are done with the write handle - Freeing it */
2528       GNUNET_free (write_handle);
2529       LOG (GNUNET_ERROR_TYPE_DEBUG,
2530            "%s: Write completion callback completed\n",
2531            GNUNET_i2s (&socket->other_peer));      
2532     }
2533     break;
2534   default:
2535     break;
2536   }
2537   return GNUNET_OK;
2538 }
2539
2540
2541 /**
2542  * Handler for DATA_ACK messages
2543  *
2544  * @param cls the 'struct GNUNET_STREAM_Socket'
2545  * @param tunnel connection to the other end
2546  * @param tunnel_ctx unused
2547  * @param sender who sent the message
2548  * @param message the actual message
2549  * @param atsi performance data for the connection
2550  * @return GNUNET_OK to keep the connection open,
2551  *         GNUNET_SYSERR to close it (signal serious error)
2552  */
2553 static int
2554 client_handle_ack (void *cls,
2555                    struct GNUNET_MESH_Tunnel *tunnel,
2556                    void **tunnel_ctx,
2557                    const struct GNUNET_PeerIdentity *sender,
2558                    const struct GNUNET_MessageHeader *message,
2559                    const struct GNUNET_ATS_Information*atsi)
2560 {
2561   struct GNUNET_STREAM_Socket *socket = cls;
2562   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2563  
2564   return handle_ack (socket, tunnel, sender, ack, atsi);
2565 }
2566
2567
2568 /**
2569  * Handler for DATA_ACK messages
2570  *
2571  * @param cls the server's listen socket
2572  * @param tunnel connection to the other end
2573  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2574  * @param sender who sent the message
2575  * @param message the actual message
2576  * @param atsi performance data for the connection
2577  * @return GNUNET_OK to keep the connection open,
2578  *         GNUNET_SYSERR to close it (signal serious error)
2579  */
2580 static int
2581 server_handle_ack (void *cls,
2582                    struct GNUNET_MESH_Tunnel *tunnel,
2583                    void **tunnel_ctx,
2584                    const struct GNUNET_PeerIdentity *sender,
2585                    const struct GNUNET_MessageHeader *message,
2586                    const struct GNUNET_ATS_Information*atsi)
2587 {
2588   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2589   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2590  
2591   return handle_ack (socket, tunnel, sender, ack, atsi);
2592 }
2593
2594
2595 /**
2596  * For client message handlers, the stream socket is in the
2597  * closure argument.
2598  */
2599 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2600   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2601   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2602    sizeof (struct GNUNET_STREAM_AckMessage) },
2603   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2604    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2605   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2606    sizeof (struct GNUNET_STREAM_MessageHeader)},
2607   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2608    sizeof (struct GNUNET_STREAM_MessageHeader)},
2609   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2610    sizeof (struct GNUNET_STREAM_MessageHeader)},
2611   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2612    sizeof (struct GNUNET_STREAM_MessageHeader)},
2613   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2614    sizeof (struct GNUNET_STREAM_MessageHeader)},
2615   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2616    sizeof (struct GNUNET_STREAM_MessageHeader)},
2617   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2618    sizeof (struct GNUNET_STREAM_MessageHeader)},
2619   {NULL, 0, 0}
2620 };
2621
2622
2623 /**
2624  * For server message handlers, the stream socket is in the
2625  * tunnel context, and the listen socket in the closure argument.
2626  */
2627 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2628   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2629   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2630    sizeof (struct GNUNET_STREAM_AckMessage) },
2631   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2632    sizeof (struct GNUNET_STREAM_MessageHeader)},
2633   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2634    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2635   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2636    sizeof (struct GNUNET_STREAM_MessageHeader)},
2637   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2638    sizeof (struct GNUNET_STREAM_MessageHeader)},
2639   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2640    sizeof (struct GNUNET_STREAM_MessageHeader)},
2641   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2642    sizeof (struct GNUNET_STREAM_MessageHeader)},
2643   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2644    sizeof (struct GNUNET_STREAM_MessageHeader)},
2645   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2646    sizeof (struct GNUNET_STREAM_MessageHeader)},
2647   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2648    sizeof (struct GNUNET_STREAM_MessageHeader)},
2649   {NULL, 0, 0}
2650 };
2651
2652
2653 /**
2654  * Function called when our target peer is connected to our tunnel
2655  *
2656  * @param cls the socket for which this tunnel is created
2657  * @param peer the peer identity of the target
2658  * @param atsi performance data for the connection
2659  */
2660 static void
2661 mesh_peer_connect_callback (void *cls,
2662                             const struct GNUNET_PeerIdentity *peer,
2663                             const struct GNUNET_ATS_Information * atsi)
2664 {
2665   struct GNUNET_STREAM_Socket *socket = cls;
2666   struct GNUNET_STREAM_MessageHeader *message;
2667   
2668   if (0 != memcmp (peer,
2669                    &socket->other_peer,
2670                    sizeof (struct GNUNET_PeerIdentity)))
2671   {
2672     LOG (GNUNET_ERROR_TYPE_DEBUG,
2673          "%s: A peer which is not our target has connected to our tunnel\n",
2674          GNUNET_i2s(peer));
2675     return;
2676   }
2677   LOG (GNUNET_ERROR_TYPE_DEBUG,
2678        "%s: Target peer %s connected\n",
2679        GNUNET_i2s (&socket->other_peer),
2680        GNUNET_i2s (&socket->other_peer));
2681   /* Set state to INIT */
2682   socket->state = STATE_INIT;
2683   /* Send HELLO message */
2684   message = generate_hello ();
2685   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2686   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2687                  socket->control_retransmission_task_id);
2688   socket->control_retransmission_task_id =
2689     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2690                                   &control_retransmission_task, socket);
2691 }
2692
2693
2694 /**
2695  * Function called when our target peer is disconnected from our tunnel
2696  *
2697  * @param cls the socket associated which this tunnel
2698  * @param peer the peer identity of the target
2699  */
2700 static void
2701 mesh_peer_disconnect_callback (void *cls,
2702                                const struct GNUNET_PeerIdentity *peer)
2703 {
2704   struct GNUNET_STREAM_Socket *socket=cls;
2705   
2706   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2707   LOG (GNUNET_ERROR_TYPE_DEBUG,
2708        "%s: Other peer %s disconnected \n",
2709        GNUNET_i2s (&socket->other_peer),
2710        GNUNET_i2s (&socket->other_peer));
2711 }
2712
2713
2714 /**
2715  * Method called whenever a peer creates a tunnel to us
2716  *
2717  * @param cls closure
2718  * @param tunnel new handle to the tunnel
2719  * @param initiator peer that started the tunnel
2720  * @param atsi performance information for the tunnel
2721  * @return initial tunnel context for the tunnel
2722  *         (can be NULL -- that's not an error)
2723  */
2724 static void *
2725 new_tunnel_notify (void *cls,
2726                    struct GNUNET_MESH_Tunnel *tunnel,
2727                    const struct GNUNET_PeerIdentity *initiator,
2728                    const struct GNUNET_ATS_Information *atsi)
2729 {
2730   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2731   struct GNUNET_STREAM_Socket *socket;
2732
2733   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2734      from the same peer again until the socket is closed */
2735
2736   if (GNUNET_NO == lsocket->listening)
2737   {
2738     GNUNET_MESH_tunnel_destroy (tunnel);
2739     return NULL;
2740   }
2741   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2742   socket->other_peer = *initiator;
2743   socket->tunnel = tunnel;
2744   socket->session_id = 0;       /* FIXME */
2745   socket->state = STATE_INIT;
2746   socket->lsocket = lsocket;
2747   socket->retransmit_timeout = lsocket->retransmit_timeout;
2748   socket->testing_active = lsocket->testing_active;
2749   socket->testing_set_write_sequence_number_value =
2750       lsocket->testing_set_write_sequence_number_value;
2751   socket->max_payload_size = lsocket->max_payload_size;
2752   LOG (GNUNET_ERROR_TYPE_DEBUG,
2753        "%s: Peer %s initiated tunnel to us\n", 
2754        GNUNET_i2s (&socket->other_peer),
2755        GNUNET_i2s (&socket->other_peer));
2756   /* FIXME: Copy MESH handle from lsocket to socket */
2757   return socket;
2758 }
2759
2760
2761 /**
2762  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2763  * any associated state.  This function is NOT called if the client has
2764  * explicitly asked for the tunnel to be destroyed using
2765  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2766  * the tunnel.
2767  *
2768  * @param cls closure (set from GNUNET_MESH_connect)
2769  * @param tunnel connection to the other end (henceforth invalid)
2770  * @param tunnel_ctx place where local state associated
2771  *                   with the tunnel is stored
2772  */
2773 static void 
2774 tunnel_cleaner (void *cls,
2775                 const struct GNUNET_MESH_Tunnel *tunnel,
2776                 void *tunnel_ctx)
2777 {
2778   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2779
2780   if (tunnel != socket->tunnel)
2781     return;
2782
2783   GNUNET_break_op(0);
2784   LOG (GNUNET_ERROR_TYPE_DEBUG,
2785        "%s: Peer %s has terminated connection abruptly\n",
2786        GNUNET_i2s (&socket->other_peer),
2787        GNUNET_i2s (&socket->other_peer));
2788
2789   socket->status = GNUNET_STREAM_SHUTDOWN;
2790
2791   /* Clear Transmit handles */
2792   if (NULL != socket->transmit_handle)
2793   {
2794     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2795     socket->transmit_handle = NULL;
2796   }
2797   /* Stop Tasks using socket->tunnel */
2798   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2799   {
2800     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2801     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2802   }
2803   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2804   {
2805     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2806     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2807   }
2808   /* FIXME: Cancel all other tasks using socket->tunnel */
2809   socket->tunnel = NULL;
2810 }
2811
2812
2813 /**
2814  * Callback to signal timeout on lockmanager lock acquire
2815  *
2816  * @param cls the ListenSocket
2817  * @param tc the scheduler task context
2818  */
2819 static void
2820 lockmanager_acquire_timeout (void *cls, 
2821                              const struct GNUNET_SCHEDULER_TaskContext *tc)
2822 {
2823   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2824   GNUNET_STREAM_ListenCallback listen_cb;
2825   void *listen_cb_cls;
2826
2827   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2828   listen_cb = lsocket->listen_cb;
2829   listen_cb_cls = lsocket->listen_cb_cls;
2830   GNUNET_STREAM_listen_close (lsocket);
2831   if (NULL != listen_cb)
2832     listen_cb (listen_cb_cls, NULL, NULL);  
2833 }
2834
2835
2836 /**
2837  * Callback to notify us on the status changes on app_port lock
2838  *
2839  * @param cls the ListenSocket
2840  * @param domain the domain name of the lock
2841  * @param lock the app_port
2842  * @param status the current status of the lock
2843  */
2844 static void
2845 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2846                        enum GNUNET_LOCKMANAGER_Status status)
2847 {
2848   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2849
2850   GNUNET_assert (lock == (uint32_t) lsocket->port);
2851   if (GNUNET_LOCKMANAGER_SUCCESS == status)
2852   {
2853     lsocket->listening = GNUNET_YES;
2854     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2855     {
2856       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2857       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2858     }
2859     if (NULL == lsocket->mesh)
2860     {
2861       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
2862
2863       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2864                                            lsocket, /* Closure */
2865                                            &new_tunnel_notify,
2866                                            &tunnel_cleaner,
2867                                            server_message_handlers,
2868                                            ports);
2869       GNUNET_assert (NULL != lsocket->mesh);
2870       if (NULL != lsocket->listen_ok_cb)
2871       {
2872         (void) lsocket->listen_ok_cb ();
2873       }
2874     }
2875   }
2876   if (GNUNET_LOCKMANAGER_RELEASE == status)
2877     lsocket->listening = GNUNET_NO;
2878 }
2879
2880
2881 /*****************/
2882 /* API functions */
2883 /*****************/
2884
2885
2886 /**
2887  * Tries to open a stream to the target peer
2888  *
2889  * @param cfg configuration to use
2890  * @param target the target peer to which the stream has to be opened
2891  * @param app_port the application port number which uniquely identifies this
2892  *            stream
2893  * @param open_cb this function will be called after stream has be established;
2894  *          cannot be NULL
2895  * @param open_cb_cls the closure for open_cb
2896  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2897  * @return if successful it returns the stream socket; NULL if stream cannot be
2898  *         opened 
2899  */
2900 struct GNUNET_STREAM_Socket *
2901 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2902                     const struct GNUNET_PeerIdentity *target,
2903                     GNUNET_MESH_ApplicationType app_port,
2904                     GNUNET_STREAM_OpenCallback open_cb,
2905                     void *open_cb_cls,
2906                     ...)
2907 {
2908   struct GNUNET_STREAM_Socket *socket;
2909   enum GNUNET_STREAM_Option option;
2910   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2911   va_list vargs;
2912   uint16_t payload_size;
2913
2914   LOG (GNUNET_ERROR_TYPE_DEBUG,
2915        "%s\n", __func__);
2916   GNUNET_assert (NULL != open_cb);
2917   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2918   socket->other_peer = *target;
2919   socket->open_cb = open_cb;
2920   socket->open_cls = open_cb_cls;
2921   /* Set defaults */
2922   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2923   socket->testing_active = GNUNET_NO;
2924   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2925   va_start (vargs, open_cb_cls); /* Parse variable args */
2926   do {
2927     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2928     switch (option)
2929     {
2930     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2931       /* Expect struct GNUNET_TIME_Relative */
2932       socket->retransmit_timeout = va_arg (vargs,
2933                                            struct GNUNET_TIME_Relative);
2934       break;
2935     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2936       socket->testing_active = GNUNET_YES;
2937       socket->testing_set_write_sequence_number_value = va_arg (vargs,
2938                                                                 uint32_t);
2939       break;
2940     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2941       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2942       break;
2943     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2944       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
2945       break;
2946     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
2947       payload_size = (uint16_t) va_arg (vargs, unsigned int);
2948       GNUNET_assert (0 != payload_size);
2949       if (payload_size < socket->max_payload_size)
2950         socket->max_payload_size = payload_size;
2951       break;
2952     case GNUNET_STREAM_OPTION_END:
2953       break;
2954     }
2955   } while (GNUNET_STREAM_OPTION_END != option);
2956   va_end (vargs);               /* End of variable args parsing */
2957   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2958                                       socket, /* cls */
2959                                       NULL, /* No inbound tunnel handler */
2960                                       NULL, /* No in-tunnel cleaner */
2961                                       client_message_handlers,
2962                                       ports); /* We don't get inbound tunnels */
2963   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2964   {
2965     GNUNET_free (socket);
2966     return NULL;
2967   }
2968   /* Now create the mesh tunnel to target */
2969   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2970   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2971                                               NULL, /* Tunnel context */
2972                                               &mesh_peer_connect_callback,
2973                                               &mesh_peer_disconnect_callback,
2974                                               socket);
2975   GNUNET_assert (NULL != socket->tunnel);
2976   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2977                                         &socket->other_peer);
2978   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2979   return socket;
2980 }
2981
2982
2983 /**
2984  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
2985  *
2986  * @param socket the stream socket
2987  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
2988  * @param completion_cb the callback that will be called upon successful
2989  *          shutdown of given operation
2990  * @param completion_cls the closure for the completion callback
2991  * @return the shutdown handle
2992  */
2993 struct GNUNET_STREAM_ShutdownHandle *
2994 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2995                         int operation,
2996                         GNUNET_STREAM_ShutdownCompletion completion_cb,
2997                         void *completion_cls)
2998 {
2999   struct GNUNET_STREAM_ShutdownHandle *handle;
3000   struct GNUNET_STREAM_MessageHeader *msg;
3001   
3002   GNUNET_assert (NULL == socket->shutdown_handle);
3003
3004   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3005   handle->socket = socket;
3006   handle->completion_cb = completion_cb;
3007   handle->completion_cls = completion_cls;
3008   socket->shutdown_handle = handle;
3009
3010   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
3011   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
3012   switch (operation)
3013   {
3014   case SHUT_RD:
3015     handle->operation = SHUT_RD;
3016     if (NULL != socket->read_handle)
3017       LOG (GNUNET_ERROR_TYPE_WARNING,
3018            "Existing read handle should be cancelled before shutting"
3019            " down reading\n");
3020     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3021     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3022                    GNUNET_NO);
3023     break;
3024   case SHUT_WR:
3025     handle->operation = SHUT_WR;
3026     if (NULL != socket->write_handle)
3027       LOG (GNUNET_ERROR_TYPE_WARNING,
3028            "Existing write handle should be cancelled before shutting"
3029            " down writing\n");
3030     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3031     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3032                    GNUNET_NO);
3033     break;
3034   case SHUT_RDWR:
3035     handle->operation = SHUT_RDWR;
3036     if (NULL != socket->write_handle)
3037       LOG (GNUNET_ERROR_TYPE_WARNING,
3038            "Existing write handle should be cancelled before shutting"
3039            " down writing\n");
3040     if (NULL != socket->read_handle)
3041       LOG (GNUNET_ERROR_TYPE_WARNING,
3042            "Existing read handle should be cancelled before shutting"
3043            " down reading\n");
3044     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3045     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3046     break;
3047   default:
3048     LOG (GNUNET_ERROR_TYPE_WARNING,
3049          "GNUNET_STREAM_shutdown called with invalid value for "
3050          "parameter operation -- Ignoring\n");
3051     GNUNET_free (msg);
3052     GNUNET_free (handle);
3053     return NULL;
3054   }
3055   handle->close_msg_retransmission_task_id =
3056     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3057                                   &close_msg_retransmission_task,
3058                                   handle);
3059   return handle;
3060 }
3061
3062
3063 /**
3064  * Cancels a pending shutdown
3065  *
3066  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3067  */
3068 void
3069 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3070 {
3071   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3072     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3073   handle->socket->shutdown_handle = NULL;
3074   GNUNET_free (handle);
3075 }
3076
3077
3078 /**
3079  * Closes the stream
3080  *
3081  * @param socket the stream socket
3082  */
3083 void
3084 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3085 {
3086   struct MessageQueue *head;
3087
3088   if (NULL != socket->read_handle)
3089   {
3090     LOG (GNUNET_ERROR_TYPE_WARNING,
3091          "Closing STREAM socket when a read handle is pending\n");
3092     GNUNET_STREAM_io_read_cancel (socket->read_handle);
3093   }
3094   if (NULL != socket->write_handle)
3095   {
3096     LOG (GNUNET_ERROR_TYPE_WARNING,
3097          "Closing STREAM socket when a write handle is pending\n");
3098     GNUNET_STREAM_io_write_cancel (socket->write_handle);
3099     //socket->write_handle = NULL;
3100   }
3101   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3102   {
3103     /* socket closed with read task pending!? */
3104     GNUNET_break (0);
3105     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3106     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3107   }  
3108   /* Terminate the ack'ing tasks if they are still present */
3109   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3110   {
3111     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3112     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3113   }
3114   /* Terminate the control retransmission tasks */
3115   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3116   {
3117     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3118   }
3119   /* Clear Transmit handles */
3120   if (NULL != socket->transmit_handle)
3121   {
3122     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3123     socket->transmit_handle = NULL;
3124   }
3125   /* Clear existing message queue */
3126   while (NULL != (head = socket->queue_head)) {
3127     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3128                                  socket->queue_tail,
3129                                  head);
3130     GNUNET_free (head->message);
3131     GNUNET_free (head);
3132   }
3133   /* Close associated tunnel */
3134   if (NULL != socket->tunnel)
3135   {
3136     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3137     socket->tunnel = NULL;
3138   }
3139   /* Close mesh connection */
3140   if (NULL != socket->mesh && NULL == socket->lsocket)
3141   {
3142     GNUNET_MESH_disconnect (socket->mesh);
3143     socket->mesh = NULL;
3144   }  
3145   /* Release receive buffer */
3146   if (NULL != socket->receive_buffer)
3147   {
3148     GNUNET_free (socket->receive_buffer);
3149   }
3150   GNUNET_free (socket);
3151 }
3152
3153
3154 /**
3155  * Listens for stream connections for a specific application ports
3156  *
3157  * @param cfg the configuration to use
3158  * @param app_port the application port for which new streams will be accepted
3159  * @param listen_cb this function will be called when a peer tries to establish
3160  *            a stream with us
3161  * @param listen_cb_cls closure for listen_cb
3162  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3163  * @return listen socket, NULL for any error
3164  */
3165 struct GNUNET_STREAM_ListenSocket *
3166 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3167                       GNUNET_MESH_ApplicationType app_port,
3168                       GNUNET_STREAM_ListenCallback listen_cb,
3169                       void *listen_cb_cls,
3170                       ...)
3171 {
3172   /* FIXME: Add variable args for passing configration options? */
3173   struct GNUNET_STREAM_ListenSocket *lsocket;
3174   struct GNUNET_TIME_Relative listen_timeout;
3175   enum GNUNET_STREAM_Option option;
3176   va_list vargs;
3177   uint16_t payload_size;
3178
3179   GNUNET_assert (NULL != listen_cb);
3180   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3181   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3182   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3183   if (NULL == lsocket->lockmanager)
3184   {
3185     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3186     GNUNET_free (lsocket);
3187     return NULL;
3188   }
3189   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3190   /* Set defaults */
3191   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3192   lsocket->testing_active = GNUNET_NO;
3193   lsocket->listen_ok_cb = NULL;
3194   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3195   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3196   va_start (vargs, listen_cb_cls);
3197   do {
3198     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3199     switch (option)
3200     {
3201     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3202       lsocket->retransmit_timeout = va_arg (vargs,
3203                                             struct GNUNET_TIME_Relative);
3204       break;
3205     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3206       lsocket->testing_active = GNUNET_YES;
3207       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3208                                                                  uint32_t);
3209       break;
3210     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3211       listen_timeout = GNUNET_TIME_relative_multiply
3212         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3213       break;
3214     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3215       lsocket->listen_ok_cb = va_arg (vargs,
3216                                       GNUNET_STREAM_ListenSuccessCallback);
3217       break;
3218     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3219       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3220       GNUNET_assert (0 != payload_size);
3221       if (payload_size < lsocket->max_payload_size)
3222         lsocket->max_payload_size = payload_size;
3223       break;
3224     case GNUNET_STREAM_OPTION_END:
3225       break;
3226     }
3227   } while (GNUNET_STREAM_OPTION_END != option);
3228   va_end (vargs);
3229   lsocket->port = app_port;
3230   lsocket->listen_cb = listen_cb;
3231   lsocket->listen_cb_cls = listen_cb_cls;
3232   lsocket->locking_request = 
3233     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3234                                      (uint32_t) lsocket->port,
3235                                      &lock_status_change_cb, lsocket);
3236   lsocket->lockmanager_acquire_timeout_task =
3237     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3238                                   &lockmanager_acquire_timeout, lsocket);
3239   return lsocket;
3240 }
3241
3242
3243 /**
3244  * Closes the listen socket
3245  *
3246  * @param lsocket the listen socket
3247  */
3248 void
3249 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3250 {
3251   /* Close MESH connection */
3252   if (NULL != lsocket->mesh)
3253     GNUNET_MESH_disconnect (lsocket->mesh);
3254   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3255   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3256     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3257   if (NULL != lsocket->locking_request)
3258     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3259   if (NULL != lsocket->lockmanager)
3260     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3261   GNUNET_free (lsocket);
3262 }
3263
3264
3265 /**
3266  * Tries to write the given data to the stream. The maximum size of data that
3267  * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3268  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3269  * violation, however only the said number of maximum bytes will be written.
3270  *
3271  * @param socket the socket representing a stream
3272  * @param data the data buffer from where the data is written into the stream
3273  * @param size the number of bytes to be written from the data buffer
3274  * @param timeout the timeout period
3275  * @param write_cont the function to call upon writing some bytes into the
3276  *          stream 
3277  * @param write_cont_cls the closure
3278  *
3279  * @return handle to cancel the operation; if a previous write is pending or
3280  *           the stream has been shutdown for this operation then write_cont is
3281  *           immediately called and NULL is returned.
3282  */
3283 struct GNUNET_STREAM_IOWriteHandle *
3284 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3285                      const void *data,
3286                      size_t size,
3287                      struct GNUNET_TIME_Relative timeout,
3288                      GNUNET_STREAM_CompletionContinuation write_cont,
3289                      void *write_cont_cls)
3290 {
3291   struct GNUNET_STREAM_IOWriteHandle *io_handle;
3292   struct GNUNET_STREAM_DataMessage *data_msg;
3293   const void *sweep;
3294   struct GNUNET_TIME_Relative ack_deadline;
3295   unsigned int num_needed_packets;
3296   unsigned int packet;
3297   uint32_t packet_size;
3298   uint32_t payload_size;
3299   uint16_t max_data_packet_size;
3300
3301   LOG (GNUNET_ERROR_TYPE_DEBUG,
3302        "%s\n", __func__);
3303   if (NULL != socket->write_handle)
3304   {
3305     GNUNET_break (0);
3306     return NULL;
3307   }
3308   switch (socket->state)
3309   {
3310   case STATE_TRANSMIT_CLOSED:
3311   case STATE_TRANSMIT_CLOSE_WAIT:
3312   case STATE_CLOSED:
3313   case STATE_CLOSE_WAIT:
3314     if (NULL != write_cont)
3315       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3316     LOG (GNUNET_ERROR_TYPE_DEBUG,
3317          "%s() END\n", __func__);
3318     return NULL;
3319   case STATE_INIT:
3320   case STATE_LISTEN:
3321   case STATE_HELLO_WAIT:
3322     if (NULL != write_cont)
3323       /* FIXME: GNUNET_STREAM_SYSERR?? */
3324       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3325     LOG (GNUNET_ERROR_TYPE_DEBUG,
3326          "%s() END\n", __func__);
3327     return NULL;
3328   case STATE_ESTABLISHED:
3329   case STATE_RECEIVE_CLOSED:
3330   case STATE_RECEIVE_CLOSE_WAIT:
3331     break;
3332   }
3333   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3334     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3335   num_needed_packets =
3336       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3337   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
3338   io_handle->socket = socket;
3339   io_handle->write_cont = write_cont;
3340   io_handle->write_cont_cls = write_cont_cls;
3341   io_handle->size = size;
3342   sweep = data;
3343   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3344      determined from RTT */
3345   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3346   /* Divide the given buffer into packets for sending */
3347   max_data_packet_size =
3348       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3349   for (packet=0; packet < num_needed_packets; packet++)
3350   {
3351     if ((packet + 1) * socket->max_payload_size < size) 
3352     {
3353       payload_size = socket->max_payload_size;
3354       packet_size = max_data_packet_size;
3355     }
3356     else 
3357     {
3358       payload_size = size - packet * socket->max_payload_size;
3359       packet_size = 
3360           payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3361     }
3362     io_handle->messages[packet] = GNUNET_malloc (packet_size);
3363     io_handle->messages[packet]->header.header.size = htons (packet_size);
3364     io_handle->messages[packet]->header.header.type =
3365       htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3366     io_handle->messages[packet]->sequence_number =
3367       htonl (socket->write_sequence_number++);
3368     io_handle->messages[packet]->offset = htonl (socket->write_offset);
3369     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3370        determined from RTT */
3371     io_handle->messages[packet]->ack_deadline =
3372       GNUNET_TIME_relative_hton (ack_deadline);
3373     data_msg = io_handle->messages[packet];
3374     /* Copy data from given buffer to the packet */
3375     memcpy (&data_msg[1], sweep, payload_size);
3376     sweep += payload_size;
3377     socket->write_offset += payload_size;
3378   }
3379   /* ack the last data message. FIXME: remove when we figure out how to do this
3380      using RTT */
3381   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3382       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3383   socket->write_handle = io_handle;
3384   write_data (socket);
3385   LOG (GNUNET_ERROR_TYPE_DEBUG,
3386        "%s() END\n", __func__);
3387   return io_handle;
3388 }
3389
3390
3391 /**
3392  * Tries to read data from the stream.
3393  *
3394  * @param socket the socket representing a stream
3395  * @param timeout the timeout period
3396  * @param proc function to call with data (once only)
3397  * @param proc_cls the closure for proc
3398  *
3399  * @return handle to cancel the operation; if the stream has been shutdown for
3400  *           this type of opeartion then the DataProcessor is immediately
3401  *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
3402  */
3403 struct GNUNET_STREAM_IOReadHandle *
3404 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3405                     struct GNUNET_TIME_Relative timeout,
3406                     GNUNET_STREAM_DataProcessor proc,
3407                     void *proc_cls)
3408 {
3409   struct GNUNET_STREAM_IOReadHandle *read_handle;
3410   
3411   LOG (GNUNET_ERROR_TYPE_DEBUG,
3412        "%s: %s()\n", 
3413        GNUNET_i2s (&socket->other_peer),
3414        __func__);
3415   /* Return NULL if there is already a read handle; the user has to cancel that
3416      first before continuing or has to wait until it is completed */
3417   if (NULL != socket->read_handle) 
3418     return NULL;
3419   GNUNET_assert (NULL != proc);
3420   switch (socket->state)
3421   {
3422   case STATE_RECEIVE_CLOSED:
3423   case STATE_RECEIVE_CLOSE_WAIT:
3424   case STATE_CLOSED:
3425   case STATE_CLOSE_WAIT:
3426     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3427     LOG (GNUNET_ERROR_TYPE_DEBUG,
3428          "%s: %s() END\n",
3429          GNUNET_i2s (&socket->other_peer),
3430          __func__);
3431     return NULL;
3432   default:
3433     break;
3434   }
3435   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3436   read_handle->proc = proc;
3437   read_handle->proc_cls = proc_cls;
3438   read_handle->socket = socket;
3439   socket->read_handle = read_handle;
3440   /* Check if we have a packet at bitmap 0 */
3441   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3442                                           0))
3443   {
3444     socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3445                                                      socket);   
3446   }
3447   /* Setup the read timeout task */
3448   socket->read_io_timeout_task_id =
3449     GNUNET_SCHEDULER_add_delayed (timeout,
3450                                   &read_io_timeout,
3451                                   socket);
3452   LOG (GNUNET_ERROR_TYPE_DEBUG,
3453        "%s: %s() END\n",
3454        GNUNET_i2s (&socket->other_peer),
3455        __func__);
3456   return read_handle;
3457 }
3458
3459
3460 /**
3461  * Cancel pending write operation.
3462  *
3463  * @param ioh handle to operation to cancel
3464  */
3465 void
3466 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3467 {
3468   struct GNUNET_STREAM_Socket *socket = ioh->socket;
3469   unsigned int packet;
3470
3471   GNUNET_assert (NULL != socket->write_handle);
3472   GNUNET_assert (socket->write_handle == ioh);
3473
3474   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3475   {
3476     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3477     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3478   }
3479
3480   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3481   {
3482     if (NULL == ioh->messages[packet]) break;
3483     GNUNET_free (ioh->messages[packet]);
3484   }
3485       
3486   GNUNET_free (socket->write_handle);
3487   socket->write_handle = NULL;
3488 }
3489
3490
3491 /**
3492  * Cancel pending read operation.
3493  *
3494  * @param ioh handle to operation to cancel
3495  */
3496 void
3497 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3498 {
3499   struct GNUNET_STREAM_Socket *socket;
3500   
3501   socket = ioh->socket;
3502   GNUNET_assert (NULL != socket->read_handle);
3503   GNUNET_assert (ioh == socket->read_handle);
3504   /* Read io time task should be there; if it is already executed then this
3505   read handle is not valid */
3506   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
3507   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3508   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3509   /* reading task may be present; if so we have to stop it */
3510   if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
3511   {
3512     GNUNET_SCHEDULER_cancel (socket->read_task_id);
3513     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3514   }
3515   GNUNET_free (ioh);
3516   socket->read_handle = NULL;
3517 }
3518
3519 /* end of stream_api.c */