changes
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Add code for write io timeout
27  *
28  * Include retransmission for control messages
29  **/
30
31 /**
32  * @file stream/stream_api.c
33  * @brief Implementation of the stream library
34  * @author Sree Harsha Totakura
35  */
36
37 #include "platform.h"
38 #include "gnunet_common.h"
39 #include "gnunet_util_lib.h"
40 #include "gnunet_lockmanager_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_stream_lib.h"
43 #include "stream.h"
44
45 /**
46  * Generic logging shorthand
47  */
48 #define LOG(kind,...)                                   \
49   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
50
51 /**
52  * Debug logging shorthand
53  */
54 #define LOG_DEBUG(...)                          \
55   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
56
57 /**
58  * Time in relative seconds shorthand
59  */
60 #define TIME_REL_SECS(sec) \
61   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
62
63 /**
64  * The maximum packet size of a stream packet
65  */
66 #define DEFAULT_MAX_PAYLOAD_SIZE 64000
67
68 /**
69  * Receive buffer
70  */
71 #define RECEIVE_BUFFER_SIZE 4096000
72
73 /**
74  * states in the Protocol
75  */
76 enum State
77   {
78     /**
79      * Client initialization state
80      */
81     STATE_INIT,
82
83     /**
84      * Listener initialization state 
85      */
86     STATE_LISTEN,
87
88     /**
89      * Pre-connection establishment state
90      */
91     STATE_HELLO_WAIT,
92
93     /**
94      * State where a connection has been established
95      */
96     STATE_ESTABLISHED,
97
98     /**
99      * State where the socket is closed on our side and waiting to be ACK'ed
100      */
101     STATE_RECEIVE_CLOSE_WAIT,
102
103     /**
104      * State where the socket is closed for reading
105      */
106     STATE_RECEIVE_CLOSED,
107
108     /**
109      * State where the socket is closed on our side and waiting to be ACK'ed
110      */
111     STATE_TRANSMIT_CLOSE_WAIT,
112
113     /**
114      * State where the socket is closed for writing
115      */
116     STATE_TRANSMIT_CLOSED,
117
118     /**
119      * State where the socket is closed on our side and waiting to be ACK'ed
120      */
121     STATE_CLOSE_WAIT,
122
123     /**
124      * State where the socket is closed
125      */
126     STATE_CLOSED 
127   };
128
129
130 /**
131  * Functions of this type are called when a message is written
132  *
133  * @param cls the closure from queue_message
134  * @param socket the socket the written message was bound to
135  */
136 typedef void (*SendFinishCallback) (void *cls,
137                                     struct GNUNET_STREAM_Socket *socket);
138
139
140 /**
141  * The send message queue
142  */
143 struct MessageQueue
144 {
145   /**
146    * The message
147    */
148   struct GNUNET_MessageHeader *message;
149
150   /**
151    * Callback to be called when the message is sent
152    */
153   SendFinishCallback finish_cb;
154
155   /**
156    * The closure for finish_cb
157    */
158   void *finish_cb_cls;
159
160   /**
161    * The next message in queue. Should be NULL in the last message
162    */
163   struct MessageQueue *next;
164
165   /**
166    * The next message in queue. Should be NULL in the first message
167    */
168   struct MessageQueue *prev;
169 };
170
171
172 /**
173  * The STREAM Socket Handler
174  */
175 struct GNUNET_STREAM_Socket
176 {
177   /**
178    * The mesh handle
179    */
180   struct GNUNET_MESH_Handle *mesh;
181
182   /**
183    * Handle to statistics
184    */
185   struct GNUNET_STATISTICS_Handle *stat_handle;
186
187   /**
188    * The mesh tunnel handle
189    */
190   struct GNUNET_MESH_Tunnel *tunnel;
191
192   /**
193    * Stream open closure
194    */
195   void *open_cls;
196
197   /**
198    * Stream open callback
199    */
200   GNUNET_STREAM_OpenCallback open_cb;
201
202   /**
203    * The current transmit handle (if a pending transmit request exists)
204    */
205   struct GNUNET_MESH_TransmitHandle *transmit_handle;
206
207   /**
208    * The current message associated with the transmit handle
209    */
210   struct MessageQueue *queue_head;
211
212   /**
213    * The queue tail, should always point to the last message in queue
214    */
215   struct MessageQueue *queue_tail;
216
217   /**
218    * The write IO_handle associated with this socket
219    */
220   struct GNUNET_STREAM_WriteHandle *write_handle;
221
222   /**
223    * The read IO_handle associated with this socket
224    */
225   struct GNUNET_STREAM_ReadHandle *read_handle;
226
227   /**
228    * The shutdown handle associated with this socket
229    */
230   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
231
232   /**
233    * Buffer for storing received messages
234    */
235   void *receive_buffer;
236
237   /**
238    * The listen socket from which this socket is derived. Should be NULL if it
239    * is not a derived socket
240    */
241   struct GNUNET_STREAM_ListenSocket *lsocket;
242
243   /**
244    * The peer identity of the peer at the other end of the stream
245    */
246   struct GNUNET_PeerIdentity other_peer;
247
248   /**
249    * The Acknowledgement Bitmap
250    */
251   GNUNET_STREAM_AckBitmap ack_bitmap;
252
253   /**
254    * Task identifier for retransmission task after timeout
255    */
256   GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
257
258   /**
259    * Task identifier for retransmission of control messages
260    */
261   GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
262
263   /**
264    * The task for sending timely Acks
265    */
266   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
267
268   /**
269    * Retransmission timeout
270    */
271   struct GNUNET_TIME_Relative retransmit_timeout;
272
273   /**
274    * Time when the Acknowledgement was queued
275    */
276   struct GNUNET_TIME_Absolute ack_time_registered;
277
278   /**
279    * Queued Acknowledgement deadline
280    */
281   struct GNUNET_TIME_Relative ack_time_deadline;
282
283   /**
284    * Mesh transmit timeout
285    */
286   struct GNUNET_TIME_Relative mesh_retry_timeout;
287
288   /**
289    * Data retransmission timeout
290    */
291   struct GNUNET_TIME_Relative data_retransmit_timeout;
292
293   /**
294    * The state of the protocol associated with this socket
295    */
296   enum State state;
297
298   /**
299    * Whether testing mode is active or not
300    */
301   int testing_active;
302
303   /**
304    * Is receive closed
305    */
306   int receive_closed;
307
308   /**
309    * Is transmission closed
310    */
311   int transmit_closed;
312
313   /**
314    * The application port number (type: uint32_t)
315    */
316   GNUNET_MESH_ApplicationType port;
317
318   /**
319    * The write sequence number to be set incase of testing
320    */
321   uint32_t testing_set_write_sequence_number_value;
322
323   /**
324    * Write sequence number. Set to random when sending HELLO(client) and
325    * HELLO_ACK(server) 
326    */
327   uint32_t write_sequence_number;
328
329   /**
330    * Read sequence number. This number's value is determined during handshake
331    */
332   uint32_t read_sequence_number;
333
334   /**
335    * The receiver buffer size
336    */
337   uint32_t receive_buffer_size;
338
339   /**
340    * The receiver buffer boundaries
341    */
342   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
343
344   /**
345    * receiver's available buffer after the last acknowledged packet
346    */
347   uint32_t receiver_window_available;
348
349   /**
350    * The offset pointer used during write operation
351    */
352   uint32_t write_offset;
353
354   /**
355    * The offset after which we are expecting data
356    */
357   uint32_t read_offset;
358
359   /**
360    * The offset upto which user has read from the received buffer
361    */
362   uint32_t copy_offset;
363
364   /**
365    * The maximum size of the data message payload this stream handle can send
366    */
367   uint16_t max_payload_size;
368 };
369
370
371 /**
372  * A socket for listening
373  */
374 struct GNUNET_STREAM_ListenSocket
375 {
376   /**
377    * The mesh handle
378    */
379   struct GNUNET_MESH_Handle *mesh;
380
381   /**
382    * Handle to statistics
383    */
384   struct GNUNET_STATISTICS_Handle *stat_handle;
385
386   /**
387    * Our configuration
388    */
389   struct GNUNET_CONFIGURATION_Handle *cfg;
390
391   /**
392    * Handle to the lock manager service
393    */
394   struct GNUNET_LOCKMANAGER_Handle *lockmanager;
395
396   /**
397    * The active LockingRequest from lockmanager
398    */
399   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
400
401   /**
402    * Callback to call after acquring a lock and listening
403    */
404   GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
405
406   /**
407    * The callback function which is called after successful opening socket
408    */
409   GNUNET_STREAM_ListenCallback listen_cb;
410
411   /**
412    * The call back closure
413    */
414   void *listen_cb_cls;
415
416   /**
417    * The service port
418    */
419   GNUNET_MESH_ApplicationType port;
420   
421   /**
422    * The id of the lockmanager timeout task
423    */
424   GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
425
426   /**
427    * The retransmit timeout
428    */
429   struct GNUNET_TIME_Relative retransmit_timeout;
430   
431   /**
432    * Listen enabled?
433    */
434   int listening;
435
436   /**
437    * Whether testing mode is active or not
438    */
439   int testing_active;
440
441   /**
442    * The write sequence number to be set incase of testing
443    */
444   uint32_t testing_set_write_sequence_number_value;
445
446   /**
447    * The maximum size of the data message payload this stream handle can send
448    */
449   uint16_t max_payload_size;
450
451 };
452
453
454 /**
455  * The IO Write Handle
456  */
457 struct GNUNET_STREAM_WriteHandle
458 {
459   /**
460    * The socket to which this write handle is associated
461    */
462   struct GNUNET_STREAM_Socket *socket;
463
464   /**
465    * The write continuation callback
466    */
467   GNUNET_STREAM_CompletionContinuation write_cont;
468
469   /**
470    * Write continuation closure
471    */
472   void *write_cont_cls;
473
474   /**
475    * The packet_buffers associated with this Handle
476    */
477   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
478
479   /**
480    * The bitmap of this IOHandle; Corresponding bit for a message is set when
481    * it has been acknowledged by the receiver
482    */
483   GNUNET_STREAM_AckBitmap ack_bitmap;
484
485   /**
486    * Number of bytes in this write handle
487    */
488   size_t size;
489
490   /**
491    * Number of packets already transmitted from this IO handle. Retransmitted
492    * packets are not taken into account here. This is used to determine which
493    * packets account for retransmission and which packets occupy buffer space at
494    * the receiver.
495    */
496   unsigned int packets_sent;
497
498   /**
499    * The maximum of the base numbers of the received acks
500    */
501   uint32_t max_ack_base_num;
502
503 };
504
505
506 /**
507  * The IO Read Handle
508  */
509 struct GNUNET_STREAM_ReadHandle
510 {
511   /**
512    * The socket to which this read handle is associated
513    */
514   struct GNUNET_STREAM_Socket *socket;
515   
516   /**
517    * Callback for the read processor
518    */
519   GNUNET_STREAM_DataProcessor proc;
520
521   /**
522    * The closure pointer for the read processor callback
523    */
524   void *proc_cls;
525
526   /**
527    * Task identifier for the read io timeout task
528    */
529   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
530
531   /**
532    * Task scheduled to continue a read operation.
533    */
534   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
535
536   /**
537    * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call
538    * the read processor task
539    */
540   GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id;
541 };
542
543
544 /**
545  * Handle for Shutdown
546  */
547 struct GNUNET_STREAM_ShutdownHandle
548 {
549   /**
550    * The socket associated with this shutdown handle
551    */
552   struct GNUNET_STREAM_Socket *socket;
553
554   /**
555    * Shutdown completion callback
556    */
557   GNUNET_STREAM_ShutdownCompletion completion_cb;
558
559   /**
560    * Closure for completion callback
561    */
562   void *completion_cls;
563
564   /**
565    * Close message retransmission task id
566    */
567   GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
568
569   /**
570    * Task scheduled to call the shutdown continuation callback
571    */
572   GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
573
574   /**
575    * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
576    */
577   int operation;  
578 };
579
580
581 /**
582  * Collection of the state necessary to read and write gnunet messages 
583  * to a stream socket. Should be used as closure for stream_data_processor.
584  */
585 struct MQStreamState
586 {
587   /**
588    * Message stream tokenizer for the data received from the
589    * stream socket.
590    */
591   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
592
593   /**
594    * The stream socket to use for receiving and transmitting
595    * messages with the message queue.
596    */
597   struct GNUNET_STREAM_Socket *socket;
598
599   /**
600    * Current read handle, NULL if no read active.
601    */
602   struct GNUNET_STREAM_ReadHandle *rh;
603
604   /**
605    * Current write handle, NULL if no write active.
606    */
607   struct GNUNET_STREAM_WriteHandle *wh;
608 };
609
610
611
612 /**
613  * Default value in seconds for various timeouts
614  */
615 static const unsigned int default_timeout = 10;
616
617 /**
618  * The domain name for locks we use here
619  */
620 static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
621
622 /**
623  * Callback function for sending queued message
624  *
625  * @param cls closure the socket
626  * @param size number of bytes available in buf
627  * @param buf where the callee should write the message
628  * @return number of bytes written to buf
629  */
630 static size_t
631 send_message_notify (void *cls, size_t size, void *buf)
632 {
633   struct GNUNET_STREAM_Socket *socket = cls;
634   struct MessageQueue *head;
635   size_t ret;
636
637   socket->transmit_handle = NULL; /* Remove the transmit handle */
638   head = socket->queue_head;
639   if (NULL == head)
640     return 0; /* just to be safe */
641   if (0 == size)                /* request timed out */
642   {
643     socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
644         (socket->mesh_retry_timeout);    
645     LOG (GNUNET_ERROR_TYPE_DEBUG,
646          "%s: Message sending to MESH timed out. Retrying in %s \n",
647          GNUNET_i2s (&socket->other_peer),
648          GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
649                                                  GNUNET_YES));
650     socket->transmit_handle = 
651         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
652                                            GNUNET_NO, /* Corking */
653                                            socket->mesh_retry_timeout,
654                                            &socket->other_peer,
655                                            ntohs (head->message->size),
656                                            &send_message_notify,
657                                            socket);
658     return 0;
659   }
660   ret = ntohs (head->message->size);
661   GNUNET_assert (size >= ret);
662   memcpy (buf, head->message, ret);
663   if (NULL != head->finish_cb)
664   {
665     head->finish_cb (head->finish_cb_cls, socket);
666   }
667   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
668                                socket->queue_tail,
669                                head);
670   GNUNET_free (head->message);
671   GNUNET_free (head);
672   if (NULL != socket->transmit_handle)
673     return ret; /* 'finish_cb' might have triggered message already! */
674   head = socket->queue_head;
675   if (NULL != head)    /* more pending messages to send */
676   {
677     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
678     socket->transmit_handle = 
679         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
680                                            GNUNET_NO, /* Corking */
681                                            socket->mesh_retry_timeout,
682                                            &socket->other_peer,
683                                            ntohs (head->message->size),
684                                            &send_message_notify,
685                                            socket);
686   }
687   return ret;
688 }
689
690
691 /**
692  * Queues a message for sending using the mesh connection of a socket
693  *
694  * @param socket the socket whose mesh connection is used
695  * @param message the message to be sent
696  * @param finish_cb the callback to be called when the message is sent
697  * @param finish_cb_cls the closure for the callback
698  * @param urgent set to GNUNET_YES to add the message to the beginning of the
699  *          queue; GNUNET_NO to add at the tail
700  */
701 static void
702 queue_message (struct GNUNET_STREAM_Socket *socket,
703                struct GNUNET_MessageHeader *message,
704                SendFinishCallback finish_cb,
705                void *finish_cb_cls,
706                int urgent)
707 {
708   struct MessageQueue *queue_entity;
709
710   GNUNET_assert ((ntohs (message->type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
711                  && (ntohs (message->type)
712                      <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
713   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Queueing message of type %d and size %d\n",
714        GNUNET_i2s (&socket->other_peer),
715        ntohs (message->type),ntohs (message->size));
716   GNUNET_assert (NULL != message);
717   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
718   queue_entity->message = message;
719   queue_entity->finish_cb = finish_cb;
720   queue_entity->finish_cb_cls = finish_cb_cls;
721   if (GNUNET_YES == urgent)
722   {
723     GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
724                                  queue_entity);
725     if (NULL != socket->transmit_handle)
726     {
727       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
728       socket->transmit_handle = NULL;
729     }
730   }
731   else
732     GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
733                                       socket->queue_tail,
734                                       queue_entity);
735   if (NULL == socket->transmit_handle)
736   {
737     socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
738     socket->transmit_handle = 
739         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
740                                            GNUNET_NO, /* Corking */
741                                            socket->mesh_retry_timeout,
742                                            &socket->other_peer,
743                                            ntohs (message->size),
744                                            &send_message_notify,
745                                            socket);
746   }
747 }
748
749
750 /**
751  * Copies a message and queues it for sending using the mesh connection of
752  * given socket 
753  *
754  * @param socket the socket whose mesh connection is used
755  * @param message the message to be sent
756  * @param finish_cb the callback to be called when the message is sent
757  * @param finish_cb_cls the closure for the callback
758  */
759 static void
760 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
761                         const struct GNUNET_MessageHeader *message,
762                         SendFinishCallback finish_cb,
763                         void *finish_cb_cls)
764 {
765   struct GNUNET_MessageHeader *msg_copy;
766   uint16_t size;
767   
768   size = ntohs (message->size);
769   msg_copy = GNUNET_malloc (size);
770   memcpy (msg_copy, message, size);
771   queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
772 }
773
774
775 /**
776  * Writes data using the given socket. The amount of data written is limited by
777  * the receiver_window_size
778  *
779  * @param socket the socket to use
780  */
781 static void 
782 write_data (struct GNUNET_STREAM_Socket *socket);
783
784
785 /**
786  * Task for retransmitting data messages if they aren't ACK before their ack
787  * deadline 
788  *
789  * @param cls the socket
790  * @param tc the Task context
791  */
792 static void
793 data_retransmission_task (void *cls,
794                           const struct GNUNET_SCHEDULER_TaskContext *tc)
795 {
796   struct GNUNET_STREAM_Socket *socket = cls;
797   
798   socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
799   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
800     return;
801   LOG (GNUNET_ERROR_TYPE_DEBUG,
802        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
803   write_data (socket);
804 }
805
806
807 /**
808  * Task for sending ACK message
809  *
810  * @param cls the socket
811  * @param tc the Task context
812  */
813 static void
814 ack_task (void *cls,
815           const struct GNUNET_SCHEDULER_TaskContext *tc)
816 {
817   struct GNUNET_STREAM_Socket *socket = cls;
818   struct GNUNET_STREAM_AckMessage *ack_msg;
819
820   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
821   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
822     return;
823   /* Create the ACK Message */
824   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
825   ack_msg->header.size = htons (sizeof (struct 
826                                                GNUNET_STREAM_AckMessage));
827   ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
828   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
829   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
830   ack_msg->receive_window_remaining = 
831     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
832   /* Queue up ACK for immediate sending */
833   queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
834 }
835
836
837 /**
838  * Retransmission task for shutdown messages
839  *
840  * @param cls the shutdown handle
841  * @param tc the Task Context
842  */
843 static void
844 close_msg_retransmission_task (void *cls,
845                                const struct GNUNET_SCHEDULER_TaskContext *tc)
846 {
847   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
848   struct GNUNET_MessageHeader *msg;
849   struct GNUNET_STREAM_Socket *socket;
850
851   shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
852   GNUNET_assert (NULL != shutdown_handle);
853   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
854     return;
855   socket = shutdown_handle->socket;
856   msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
857   msg->size = htons (sizeof (struct GNUNET_MessageHeader));
858   switch (shutdown_handle->operation)
859   {
860   case SHUT_RDWR:
861     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
862     break;
863   case SHUT_RD:
864     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
865     break;
866   case SHUT_WR:
867     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
868     break;
869   default:
870     GNUNET_free (msg);
871     shutdown_handle->close_msg_retransmission_task_id = 
872       GNUNET_SCHEDULER_NO_TASK;
873     return;
874   }
875   queue_message (socket, msg, NULL, NULL, GNUNET_NO);
876   shutdown_handle->close_msg_retransmission_task_id =
877     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
878                                   &close_msg_retransmission_task,
879                                   shutdown_handle);
880 }
881
882
883 /**
884  * Function to modify a bit in GNUNET_STREAM_AckBitmap
885  *
886  * @param bitmap the bitmap to modify
887  * @param bit the bit number to modify
888  * @param value GNUNET_YES to on, GNUNET_NO to off
889  */
890 static void
891 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
892                       unsigned int bit, 
893                       int value)
894 {
895   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
896   if (GNUNET_YES == value)
897     *bitmap |= (1LL << bit);
898   else
899     *bitmap &= ~(1LL << bit);
900 }
901
902
903 /**
904  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
905  *
906  * @param bitmap address of the bitmap that has to be checked
907  * @param bit the bit number to check
908  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
909  */
910 static uint8_t
911 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
912                       unsigned int bit)
913 {
914   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
915   return 0 != (*bitmap & (1LL << bit));
916 }
917
918
919 /**
920  * Writes data using the given socket. The amount of data written is limited by
921  * the receiver_window_size
922  *
923  * @param socket the socket to use
924  */
925 static void 
926 write_data (struct GNUNET_STREAM_Socket *socket)
927 {
928   struct GNUNET_STREAM_WriteHandle *io_handle = socket->write_handle;
929   unsigned int packet;
930   
931   for (packet=0; packet < io_handle->packets_sent; packet++)
932   {
933     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
934                                            packet))
935     {
936       LOG (GNUNET_ERROR_TYPE_DEBUG,
937            "%s: Retransmitting DATA message with sequence %u\n",
938            GNUNET_i2s (&socket->other_peer),
939            ntohl (io_handle->messages[packet]->sequence_number));
940       copy_and_queue_message (socket,
941                               &io_handle->messages[packet]->header,
942                               NULL,
943                               NULL);
944     }
945   }
946   /* Now send new packets if there is enough buffer space */
947   while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
948          (NULL != io_handle->messages[packet]) &&
949          (socket->receiver_window_available 
950           >= ntohs (io_handle->messages[packet]->header.size)))
951   {
952     socket->receiver_window_available -= 
953       ntohs (io_handle->messages[packet]->header.size);
954     LOG (GNUNET_ERROR_TYPE_DEBUG,
955          "%s: Placing DATA message with sequence %u in send queue\n",
956          GNUNET_i2s (&socket->other_peer),
957          ntohl (io_handle->messages[packet]->sequence_number));
958     copy_and_queue_message (socket,
959                             &io_handle->messages[packet]->header,
960                             NULL,
961                             NULL);
962     packet++;
963   }
964   io_handle->packets_sent = packet;
965   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
966   {
967     socket->data_retransmit_timeout = GNUNET_TIME_STD_BACKOFF
968         (socket->data_retransmit_timeout);
969     socket->data_retransmission_task_id = 
970         GNUNET_SCHEDULER_add_delayed (socket->data_retransmit_timeout,
971                                       &data_retransmission_task,
972                                       socket);
973   }
974 }
975
976
977 /**
978  * Cleansup the sockets read handle
979  *
980  * @param socket the socket whose read handle has to be cleanedup
981  */
982 static void
983 cleanup_read_handle (struct GNUNET_STREAM_Socket *socket)
984 {
985   struct GNUNET_STREAM_ReadHandle *read_handle;
986
987   read_handle = socket->read_handle;
988   /* Read io time task should be there; if it is already executed then this
989   read handle is not valid; However upon scheduler shutdown the read io task
990   may be executed before */
991   if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id)
992     GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
993   /* reading task may be present; if so we have to stop it */
994   if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id)
995     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
996   if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id)
997     GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id);
998   GNUNET_free (read_handle);
999   socket->read_handle = NULL;
1000 }
1001
1002
1003 /**
1004  * Task for calling the read processor
1005  *
1006  * @param cls the socket
1007  * @param tc the task context
1008  */
1009 static void
1010 call_read_processor (void *cls,
1011                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1012 {
1013   struct GNUNET_STREAM_Socket *socket = cls;
1014   struct GNUNET_STREAM_ReadHandle *read_handle;
1015   GNUNET_STREAM_DataProcessor proc;
1016   void *proc_cls;
1017   size_t read_size;
1018   size_t valid_read_size;
1019   unsigned int packet;
1020   uint32_t sequence_increase;
1021   uint32_t offset_increase;
1022
1023   read_handle = socket->read_handle;
1024   GNUNET_assert (NULL != read_handle);
1025   read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1026   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1027     return;
1028   if (NULL == socket->receive_buffer) 
1029     return;
1030   GNUNET_assert (NULL != read_handle->proc);
1031   /* Check the bitmap for any holes */
1032   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1033   {
1034     if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
1035                                            packet))
1036       break;
1037   }
1038   /* We only call read processor if we have the first packet */
1039   GNUNET_assert (0 < packet);
1040   valid_read_size = 
1041     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
1042   GNUNET_assert (0 != valid_read_size);
1043   proc = read_handle->proc;
1044   proc_cls = read_handle->proc_cls;
1045   cleanup_read_handle (socket);
1046   /* Call the data processor */
1047   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
1048        GNUNET_i2s (&socket->other_peer));
1049   read_size = proc (proc_cls, GNUNET_STREAM_OK,
1050                     socket->receive_buffer + socket->copy_offset,
1051                     valid_read_size);
1052   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
1053        GNUNET_i2s (&socket->other_peer), read_size);
1054   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
1055        GNUNET_i2s (&socket->other_peer));
1056   GNUNET_assert (read_size <= valid_read_size);
1057   socket->copy_offset += read_size;
1058   /* Determine upto which packet we can remove from the buffer */
1059   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1060   {
1061     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1062     { 
1063       packet++; 
1064       break;
1065     }
1066     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1067       break;
1068   }
1069   /* If no packets can be removed we can't move the buffer */
1070   if (0 == packet) 
1071     return;
1072   sequence_increase = packet;
1073   LOG (GNUNET_ERROR_TYPE_DEBUG,
1074        "%s: Sequence increase after read processor completion: %u\n",
1075        GNUNET_i2s (&socket->other_peer), sequence_increase);
1076   /* Shift the data in the receive buffer */
1077   socket->receive_buffer = 
1078     memmove (socket->receive_buffer,
1079              socket->receive_buffer 
1080              + socket->receive_buffer_boundaries[sequence_increase-1],
1081              socket->receive_buffer_size
1082              - socket->receive_buffer_boundaries[sequence_increase-1]);
1083   /* Shift the bitmap */
1084   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1085   /* Set read_sequence_number */
1086   socket->read_sequence_number += sequence_increase;
1087   /* Set read_offset */
1088   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1089   socket->read_offset += offset_increase;
1090   /* Fix copy_offset */
1091   GNUNET_assert (offset_increase <= socket->copy_offset);
1092   socket->copy_offset -= offset_increase;
1093   /* Fix relative boundaries */
1094   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1095   {
1096     if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1097     {
1098       uint32_t ahead_buffer_boundary;
1099
1100       ahead_buffer_boundary = 
1101         socket->receive_buffer_boundaries[packet + sequence_increase];
1102       if (0 == ahead_buffer_boundary)
1103         socket->receive_buffer_boundaries[packet] = 0;
1104       else
1105       {
1106         GNUNET_assert (offset_increase < ahead_buffer_boundary);
1107         socket->receive_buffer_boundaries[packet] = 
1108           ahead_buffer_boundary - offset_increase;
1109       }
1110     }
1111     else
1112       socket->receive_buffer_boundaries[packet] = 0;
1113   }
1114 }
1115
1116
1117 /**
1118  * Cancels the existing read io handle
1119  *
1120  * @param cls the closure from the SCHEDULER call
1121  * @param tc the task context
1122  */
1123 static void
1124 read_io_timeout (void *cls,
1125                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1126 {
1127   struct GNUNET_STREAM_Socket *socket = cls;
1128   struct GNUNET_STREAM_ReadHandle *read_handle;
1129   GNUNET_STREAM_DataProcessor proc;
1130   void *proc_cls;
1131
1132   read_handle = socket->read_handle;
1133   GNUNET_assert (NULL != read_handle);
1134   read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1135   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1136     return;
1137   if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1138   {
1139     LOG (GNUNET_ERROR_TYPE_DEBUG,
1140          "%s: Read task timedout - Cancelling it\n",
1141          GNUNET_i2s (&socket->other_peer));
1142     GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1143     read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1144   }
1145   proc = read_handle->proc;
1146   proc_cls = read_handle->proc_cls;
1147   GNUNET_free (read_handle);
1148   socket->read_handle = NULL;
1149   /* Call the read processor to signal timeout */
1150   proc (proc_cls,
1151         GNUNET_STREAM_TIMEOUT,
1152         NULL,
1153         0);
1154 }
1155
1156
1157 /**
1158  * Handler for DATA messages; Same for both client and server
1159  *
1160  * @param socket the socket through which the ack was received
1161  * @param tunnel connection to the other end
1162  * @param sender who sent the message
1163  * @param msg the data message
1164  * @param atsi performance data for the connection
1165  * @return GNUNET_OK to keep the connection open,
1166  *         GNUNET_SYSERR to close it (signal serious error)
1167  */
1168 static int
1169 handle_data (struct GNUNET_STREAM_Socket *socket,
1170              struct GNUNET_MESH_Tunnel *tunnel,
1171              const struct GNUNET_PeerIdentity *sender,
1172              const struct GNUNET_STREAM_DataMessage *msg,
1173              const struct GNUNET_ATS_Information*atsi)
1174 {
1175   const void *payload;
1176   struct GNUNET_TIME_Relative ack_deadline_rel;
1177   uint32_t bytes_needed;
1178   uint32_t relative_offset;
1179   uint32_t relative_sequence_number;
1180   uint16_t size;
1181
1182   size = htons (msg->header.size);
1183   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1184   {
1185     GNUNET_break_op (0);
1186     return GNUNET_SYSERR;
1187   }
1188   if (0 != memcmp (sender, &socket->other_peer,
1189                    sizeof (struct GNUNET_PeerIdentity)))
1190   {
1191     LOG (GNUNET_ERROR_TYPE_DEBUG,
1192          "%s: Received DATA from non-confirming peer\n",
1193          GNUNET_i2s (&socket->other_peer));
1194     return GNUNET_YES;
1195   }
1196   switch (socket->state)
1197   {
1198   case STATE_ESTABLISHED:
1199   case STATE_TRANSMIT_CLOSED:
1200   case STATE_TRANSMIT_CLOSE_WAIT:      
1201     /* check if the message's sequence number is in the range we are
1202        expecting */
1203     relative_sequence_number = 
1204       ntohl (msg->sequence_number) - socket->read_sequence_number;
1205     if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1206     {
1207       LOG (GNUNET_ERROR_TYPE_DEBUG,
1208            "%s: Ignoring received message with sequence number %u\n",
1209            GNUNET_i2s (&socket->other_peer),
1210            ntohl (msg->sequence_number));
1211       /* Start ACK sending task if one is not already present */
1212       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1213       {
1214         socket->ack_task_id = 
1215           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1216                                         (msg->ack_deadline),
1217                                         &ack_task,
1218                                         socket);
1219       }
1220       return GNUNET_YES;
1221     }      
1222     /* Check if we have already seen this message */
1223     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1224                                             relative_sequence_number))
1225     {
1226       LOG (GNUNET_ERROR_TYPE_DEBUG,
1227            "%s: Ignoring already received message with sequence number %u\n",
1228            GNUNET_i2s (&socket->other_peer),
1229            ntohl (msg->sequence_number));
1230       /* Start ACK sending task if one is not already present */
1231       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1232       {
1233         socket->ack_task_id = 
1234           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1235                                         (msg->ack_deadline), &ack_task, socket);
1236       }
1237       return GNUNET_YES;
1238     }
1239     LOG (GNUNET_ERROR_TYPE_DEBUG,
1240          "%1$s: Receiving DATA with sequence number: %2$u and size: %3$d from "
1241          "%1$s\n", GNUNET_i2s (&socket->other_peer),
1242          ntohl (msg->sequence_number), ntohs (msg->header.size));
1243     /* Check if we have to allocate the buffer */
1244     size -= sizeof (struct GNUNET_STREAM_DataMessage);
1245     relative_offset = ntohl (msg->offset) - socket->read_offset;
1246     bytes_needed = relative_offset + size;
1247     if (bytes_needed > socket->receive_buffer_size)
1248     {
1249       if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1250       {
1251         socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1252                                                  bytes_needed);
1253         socket->receive_buffer_size = bytes_needed;
1254       }
1255       else
1256       {
1257         LOG (GNUNET_ERROR_TYPE_DEBUG,
1258              "%s: Cannot accommodate packet %d as buffer is full\n",
1259              GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1260         return GNUNET_YES;
1261       }
1262     }
1263     /* Copy Data to buffer */
1264     payload = &msg[1];
1265     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1266     memcpy (socket->receive_buffer + relative_offset, payload, size);
1267     socket->receive_buffer_boundaries[relative_sequence_number] = 
1268         relative_offset + size;
1269     /* Modify the ACK bitmap */
1270     ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1271                           GNUNET_YES);
1272     /* Start ACK sending task if one is not already present */
1273     ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1274     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1275     {
1276       ack_deadline_rel = 
1277           GNUNET_TIME_relative_min (ack_deadline_rel,
1278                                     GNUNET_TIME_relative_multiply
1279                                     (GNUNET_TIME_UNIT_SECONDS, 300));
1280       socket->ack_task_id = 
1281           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
1282                                         (msg->ack_deadline), &ack_task, socket);
1283       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1284       socket->ack_time_deadline = ack_deadline_rel;
1285     }
1286     else
1287     {
1288       struct GNUNET_TIME_Relative ack_time_past;
1289       struct GNUNET_TIME_Relative ack_time_remaining;
1290       struct GNUNET_TIME_Relative ack_time_min;
1291       ack_time_past = 
1292           GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1293       ack_time_remaining = GNUNET_TIME_relative_subtract
1294           (socket->ack_time_deadline, ack_time_past);
1295       ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1296                                                ack_deadline_rel);
1297       if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1298                       sizeof (struct GNUNET_TIME_Relative)))
1299       {
1300         ack_deadline_rel = ack_time_min;
1301         GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1302         socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1303                                                             &ack_task, socket);
1304         socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1305         socket->ack_time_deadline = ack_deadline_rel;
1306       }
1307     }
1308     if ((NULL != socket->read_handle) /* A read handle is waiting */
1309         /* There is no current read task */
1310         && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1311         /* We have the first packet */
1312         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1313     {
1314       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1315            GNUNET_i2s (&socket->other_peer));
1316       socket->read_handle->read_task_id =
1317           GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1318     }
1319     break;
1320   default:
1321     LOG (GNUNET_ERROR_TYPE_DEBUG,
1322          "%s: Received data message when it cannot be handled\n",
1323          GNUNET_i2s (&socket->other_peer));
1324     break;
1325   }
1326   return GNUNET_YES;
1327 }
1328
1329
1330 /**
1331  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1332  *
1333  * @param cls the socket (set from GNUNET_MESH_connect)
1334  * @param tunnel connection to the other end
1335  * @param tunnel_ctx place to store local state associated with the tunnel
1336  * @param sender who sent the message
1337  * @param message the actual message
1338  * @param atsi performance data for the connection
1339  * @return GNUNET_OK to keep the connection open,
1340  *         GNUNET_SYSERR to close it (signal serious error)
1341  */
1342 static int
1343 client_handle_data (void *cls,
1344                     struct GNUNET_MESH_Tunnel *tunnel,
1345                     void **tunnel_ctx,
1346                     const struct GNUNET_PeerIdentity *sender,
1347                     const struct GNUNET_MessageHeader *message,
1348                     const struct GNUNET_ATS_Information*atsi)
1349 {
1350   struct GNUNET_STREAM_Socket *socket = cls;
1351
1352   return handle_data (socket, tunnel, sender, 
1353                       (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1354 }
1355
1356
1357 /**
1358  * Callback to set state to ESTABLISHED
1359  *
1360  * @param cls the closure NULL;
1361  * @param socket the socket to requiring state change
1362  */
1363 static void
1364 set_state_established (void *cls,
1365                        struct GNUNET_STREAM_Socket *socket)
1366 {
1367   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1368        "%s: Attaining ESTABLISHED state\n",
1369        GNUNET_i2s (&socket->other_peer));
1370   socket->write_offset = 0;
1371   socket->read_offset = 0;
1372   socket->state = STATE_ESTABLISHED;
1373   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1374                  socket->control_retransmission_task_id);
1375   GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1376   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1377   if (NULL != socket->lsocket)
1378   {
1379     LOG (GNUNET_ERROR_TYPE_DEBUG,
1380          "%s: Calling listen callback\n",
1381          GNUNET_i2s (&socket->other_peer));
1382     if (GNUNET_SYSERR == 
1383         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1384                                     socket,
1385                                     &socket->other_peer))
1386     {
1387       socket->state = STATE_CLOSED;
1388       /* FIXME: We should close in a decent way (send RST) */
1389       GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1390       GNUNET_free (socket);
1391     }
1392   }
1393   else
1394     socket->open_cb (socket->open_cls, socket);
1395 }
1396
1397
1398 /**
1399  * Callback to set state to HELLO_WAIT
1400  *
1401  * @param cls the closure from queue_message
1402  * @param socket the socket to requiring state change
1403  */
1404 static void
1405 set_state_hello_wait (void *cls,
1406                       struct GNUNET_STREAM_Socket *socket)
1407 {
1408   GNUNET_assert (STATE_INIT == socket->state);
1409   LOG (GNUNET_ERROR_TYPE_DEBUG,
1410        "%s: Attaining HELLO_WAIT state\n",
1411        GNUNET_i2s (&socket->other_peer));
1412   socket->state = STATE_HELLO_WAIT;
1413 }
1414
1415
1416 /**
1417  * Callback to set state to CLOSE_WAIT
1418  *
1419  * @param cls the closure from queue_message
1420  * @param socket the socket requiring state change
1421  */
1422 static void
1423 set_state_close_wait (void *cls,
1424                       struct GNUNET_STREAM_Socket *socket)
1425 {
1426   LOG (GNUNET_ERROR_TYPE_DEBUG,
1427        "%s: Attaing CLOSE_WAIT state\n",
1428        GNUNET_i2s (&socket->other_peer));
1429   socket->state = STATE_CLOSE_WAIT;
1430   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1431   socket->receive_buffer = NULL;
1432   socket->receive_buffer_size = 0;
1433 }
1434
1435
1436 /**
1437  * Callback to set state to RECEIVE_CLOSE_WAIT
1438  *
1439  * @param cls the closure from queue_message
1440  * @param socket the socket requiring state change
1441  */
1442 static void
1443 set_state_receive_close_wait (void *cls,
1444                               struct GNUNET_STREAM_Socket *socket)
1445 {
1446   LOG (GNUNET_ERROR_TYPE_DEBUG,
1447        "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1448        GNUNET_i2s (&socket->other_peer));
1449   socket->state = STATE_RECEIVE_CLOSE_WAIT;
1450   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1451   socket->receive_buffer = NULL;
1452   socket->receive_buffer_size = 0;
1453 }
1454
1455
1456 /**
1457  * Callback to set state to TRANSMIT_CLOSE_WAIT
1458  *
1459  * @param cls the closure from queue_message
1460  * @param socket the socket requiring state change
1461  */
1462 static void
1463 set_state_transmit_close_wait (void *cls,
1464                                struct GNUNET_STREAM_Socket *socket)
1465 {
1466   LOG (GNUNET_ERROR_TYPE_DEBUG,
1467        "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1468        GNUNET_i2s (&socket->other_peer));
1469   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1470 }
1471
1472
1473 /**
1474  * Callback to set state to CLOSED
1475  *
1476  * @param cls the closure from queue_message
1477  * @param socket the socket requiring state change
1478  */
1479 static void
1480 set_state_closed (void *cls,
1481                   struct GNUNET_STREAM_Socket *socket)
1482 {
1483   socket->state = STATE_CLOSED;
1484 }
1485
1486
1487 /**
1488  * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1489  *
1490  * @return the generate hello message
1491  */
1492 static struct GNUNET_MessageHeader *
1493 generate_hello (struct GNUNET_STREAM_Socket *socket)
1494 {
1495   struct GNUNET_STREAM_HelloMessage *msg;
1496
1497   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloMessage));
1498   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1499   msg->header.size = htons (sizeof (struct GNUNET_STREAM_HelloMessage));
1500   msg->port = GNUNET_htonll ((uint64_t) socket->port);
1501   return &msg->header;
1502 }
1503
1504
1505 /**
1506  * Returns a new HelloAckMessage. Also sets the write sequence number for the
1507  * socket
1508  *
1509  * @param socket the socket for which this HelloAckMessage has to be generated
1510  * @param generate_seq GNUNET_YES to generate the write sequence number,
1511  *          GNUNET_NO to use the existing sequence number
1512  * @return the HelloAckMessage
1513  */
1514 static struct GNUNET_STREAM_HelloAckMessage *
1515 generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1516                     int generate_seq)
1517 {
1518   struct GNUNET_STREAM_HelloAckMessage *msg;
1519
1520   if (GNUNET_YES == generate_seq)
1521   {
1522     if (GNUNET_YES == socket->testing_active)
1523       socket->write_sequence_number =
1524         socket->testing_set_write_sequence_number_value;
1525     else
1526       socket->write_sequence_number = 
1527         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1528     LOG_DEBUG ("%s: write sequence number %u\n",
1529                GNUNET_i2s (&socket->other_peer),
1530                (unsigned int) socket->write_sequence_number);
1531   }
1532   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1533   msg->header.size = 
1534     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1535   msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1536   msg->sequence_number = htonl (socket->write_sequence_number);
1537   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1538   return msg;
1539 }
1540
1541
1542 /**
1543  * Task for retransmitting control messages if they aren't ACK'ed before a
1544  * deadline
1545  *
1546  * @param cls the socket
1547  * @param tc the Task context
1548  */
1549 static void
1550 control_retransmission_task (void *cls,
1551                              const struct GNUNET_SCHEDULER_TaskContext *tc)
1552 {
1553   struct GNUNET_STREAM_Socket *socket = cls;
1554     
1555   socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1556   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1557     return;
1558   LOG_DEBUG ("%s: Retransmitting a control message\n",
1559                  GNUNET_i2s (&socket->other_peer));
1560   switch (socket->state)
1561   {
1562   case STATE_INIT:    
1563     GNUNET_break (0);
1564     break;
1565   case STATE_LISTEN:
1566     GNUNET_break (0);
1567     break;
1568   case STATE_HELLO_WAIT:
1569     if (NULL == socket->lsocket) /* We are client */
1570       queue_message (socket, generate_hello (socket), NULL, NULL, GNUNET_NO);
1571     else
1572       queue_message (socket,
1573                      (struct GNUNET_MessageHeader *)
1574                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1575                      GNUNET_NO);
1576     socket->control_retransmission_task_id =
1577     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1578                                   &control_retransmission_task, socket);
1579     break;
1580   case STATE_ESTABLISHED:
1581     if (NULL == socket->lsocket)
1582       queue_message (socket,
1583                      (struct GNUNET_MessageHeader *)
1584                      generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1585                      GNUNET_NO);
1586     else
1587       GNUNET_break (0);
1588     break;
1589   default:
1590     GNUNET_break (0);
1591   }  
1592 }
1593
1594
1595 /**
1596  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1597  *
1598  * @param cls the socket (set from GNUNET_MESH_connect)
1599  * @param tunnel connection to the other end
1600  * @param tunnel_ctx this is NULL
1601  * @param sender who sent the message
1602  * @param message the actual message
1603  * @param atsi performance data for the connection
1604  * @return GNUNET_OK to keep the connection open,
1605  *         GNUNET_SYSERR to close it (signal serious error)
1606  */
1607 static int
1608 client_handle_hello_ack (void *cls,
1609                          struct GNUNET_MESH_Tunnel *tunnel,
1610                          void **tunnel_ctx,
1611                          const struct GNUNET_PeerIdentity *sender,
1612                          const struct GNUNET_MessageHeader *message,
1613                          const struct GNUNET_ATS_Information*atsi)
1614 {
1615   struct GNUNET_STREAM_Socket *socket = cls;
1616   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1617   struct GNUNET_STREAM_HelloAckMessage *reply;
1618
1619   if (0 != memcmp (sender, &socket->other_peer,
1620                    sizeof (struct GNUNET_PeerIdentity)))
1621   {
1622     LOG (GNUNET_ERROR_TYPE_DEBUG,
1623          "%s: Received HELLO_ACK from non-confirming peer\n",
1624          GNUNET_i2s (&socket->other_peer));
1625     return GNUNET_YES;
1626   }
1627   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1628   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1629        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1630   GNUNET_assert (socket->tunnel == tunnel);
1631   switch (socket->state)
1632   {
1633   case STATE_HELLO_WAIT:
1634     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1635     LOG_DEBUG ("%s: Read sequence number %u\n", 
1636                GNUNET_i2s (&socket->other_peer), socket->read_sequence_number);
1637     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1638     reply = generate_hello_ack (socket, GNUNET_YES);
1639     queue_message (socket, &reply->header, &set_state_established,
1640                    NULL, GNUNET_NO);    
1641     return GNUNET_OK;
1642   case STATE_ESTABLISHED:
1643     // call statistics (# ACKs ignored++)
1644     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1645                    socket->control_retransmission_task_id);
1646     socket->control_retransmission_task_id =
1647       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1648     return GNUNET_OK;
1649   default:
1650     LOG_DEBUG ("%1$s: Server %1$s sent HELLO_ACK when in state %2$d\n", 
1651                GNUNET_i2s (&socket->other_peer), socket->state);
1652     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1653     return GNUNET_SYSERR;
1654   }
1655 }
1656
1657
1658 /**
1659  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1660  *
1661  * @param cls the socket (set from GNUNET_MESH_connect)
1662  * @param tunnel connection to the other end
1663  * @param tunnel_ctx this is NULL
1664  * @param sender who sent the message
1665  * @param message the actual message
1666  * @param atsi performance data for the connection
1667  * @return GNUNET_OK to keep the connection open,
1668  *         GNUNET_SYSERR to close it (signal serious error)
1669  */
1670 static int
1671 client_handle_reset (void *cls,
1672                      struct GNUNET_MESH_Tunnel *tunnel,
1673                      void **tunnel_ctx,
1674                      const struct GNUNET_PeerIdentity *sender,
1675                      const struct GNUNET_MessageHeader *message,
1676                      const struct GNUNET_ATS_Information*atsi)
1677 {
1678   // struct GNUNET_STREAM_Socket *socket = cls;
1679
1680   return GNUNET_OK;
1681 }
1682
1683
1684 /**
1685  * Frees the socket's receive buffers, marks the socket as receive closed and
1686  * calls the DataProcessor with GNUNET_STREAM_SHUTDOWN status if a read handle
1687  * is present
1688  *
1689  * @param socket the socket
1690  */
1691 static void
1692 do_receive_shutdown (struct GNUNET_STREAM_Socket *socket)
1693 {
1694   socket->receive_closed = GNUNET_YES;  
1695   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1696   socket->receive_buffer = NULL;
1697   socket->receive_buffer_size = 0;
1698   if (NULL != socket->read_handle)
1699   {
1700     GNUNET_STREAM_DataProcessor proc;
1701     void *proc_cls;
1702
1703     proc = socket->read_handle->proc;
1704     proc_cls = socket->read_handle->proc_cls;
1705     GNUNET_STREAM_read_cancel (socket->read_handle);
1706     socket->read_handle = NULL;
1707     if (NULL != proc)
1708       proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
1709   }
1710 }
1711
1712
1713 /**
1714  * Marks the socket as transmit closed and calls the CompletionContinuation with
1715  * GNUNET_STREAM_SHUTDOWN status if a write handle is present
1716  *
1717  * @param socket the socket
1718  */
1719 static void
1720 do_transmit_shutdown (struct GNUNET_STREAM_Socket *socket)
1721 {
1722   socket->transmit_closed = GNUNET_YES;
1723   /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1724      that that stream has been shutdown */
1725   if (NULL != socket->write_handle)
1726   {
1727     GNUNET_STREAM_CompletionContinuation wc;
1728     void *wc_cls;
1729
1730     wc = socket->write_handle->write_cont;
1731     wc_cls = socket->write_handle->write_cont_cls;
1732     GNUNET_STREAM_write_cancel (socket->write_handle);
1733     socket->write_handle = NULL;
1734     if (NULL != wc)
1735       wc (wc_cls,
1736           GNUNET_STREAM_SHUTDOWN, 0);
1737   }
1738 }
1739
1740
1741 /**
1742  * Common message handler for handling TRANSMIT_CLOSE messages
1743  *
1744  * @param socket the socket through which the ack was received
1745  * @param tunnel connection to the other end
1746  * @param sender who sent the message
1747  * @param msg the transmit close message
1748  * @param atsi performance data for the connection
1749  * @return GNUNET_OK to keep the connection open,
1750  *         GNUNET_SYSERR to close it (signal serious error)
1751  */
1752 static int
1753 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1754                        struct GNUNET_MESH_Tunnel *tunnel,
1755                        const struct GNUNET_PeerIdentity *sender,
1756                        const struct GNUNET_MessageHeader *msg,
1757                        const struct GNUNET_ATS_Information*atsi)
1758 {
1759   struct GNUNET_MessageHeader *reply;
1760
1761   switch (socket->state)
1762   {
1763   case STATE_INIT:
1764   case STATE_LISTEN:
1765   case STATE_HELLO_WAIT:
1766     LOG (GNUNET_ERROR_TYPE_DEBUG,
1767          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1768          GNUNET_i2s (&socket->other_peer));
1769     return GNUNET_OK;
1770   default:
1771     break;
1772   }
1773   /* Send TRANSMIT_CLOSE_ACK */
1774   reply = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
1775   reply->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1776   reply->size = htons (sizeof (struct GNUNET_MessageHeader));
1777   queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1778   LOG (GNUNET_ERROR_TYPE_DEBUG, "%1$s: Received TRANSMIT_CLOSE from %1$s\n",
1779        GNUNET_i2s (&socket->other_peer));
1780   switch(socket->state)
1781   {
1782   case STATE_RECEIVE_CLOSED:
1783   case STATE_RECEIVE_CLOSE_WAIT:
1784   case STATE_CLOSE_WAIT:
1785   case STATE_CLOSED:
1786     return GNUNET_OK;
1787   default:
1788     break;
1789   }
1790   do_receive_shutdown (socket);
1791   if (GNUNET_YES == socket->transmit_closed)
1792     socket->state = STATE_CLOSED;
1793   else
1794     socket->state = STATE_RECEIVE_CLOSED;
1795   return GNUNET_OK;
1796 }
1797
1798
1799 /**
1800  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1801  *
1802  * @param cls the socket (set from GNUNET_MESH_connect)
1803  * @param tunnel connection to the other end
1804  * @param tunnel_ctx this is NULL
1805  * @param sender who sent the message
1806  * @param message the actual message
1807  * @param atsi performance data for the connection
1808  * @return GNUNET_OK to keep the connection open,
1809  *         GNUNET_SYSERR to close it (signal serious error)
1810  */
1811 static int
1812 client_handle_transmit_close (void *cls,
1813                               struct GNUNET_MESH_Tunnel *tunnel,
1814                               void **tunnel_ctx,
1815                               const struct GNUNET_PeerIdentity *sender,
1816                               const struct GNUNET_MessageHeader *message,
1817                               const struct GNUNET_ATS_Information*atsi)
1818 {
1819   struct GNUNET_STREAM_Socket *socket = cls;
1820   
1821   return handle_transmit_close (socket,
1822                                 tunnel,
1823                                 sender,
1824                                 (struct GNUNET_MessageHeader *)message,
1825                                 atsi);
1826 }
1827
1828
1829 /**
1830  * Task for calling the shutdown continuation callback
1831  *
1832  * @param cls the socket
1833  * @param tc the scheduler task context
1834  */
1835 static void
1836 call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1837 {
1838   struct GNUNET_STREAM_Socket *socket = cls;
1839   
1840   GNUNET_assert (NULL != socket->shutdown_handle);
1841   socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1842   if (NULL != socket->shutdown_handle->completion_cb)
1843     socket->shutdown_handle->completion_cb
1844         (socket->shutdown_handle->completion_cls,
1845          socket->shutdown_handle->operation);
1846   GNUNET_free (socket->shutdown_handle);
1847   socket->shutdown_handle = NULL;
1848 }
1849
1850
1851 /**
1852  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1853  *
1854  * @param socket the socket
1855  * @param tunnel connection to the other end
1856  * @param sender who sent the message
1857  * @param message the actual message
1858  * @param atsi performance data for the connection
1859  * @param operation the close operation which is being ACK'ed
1860  * @return GNUNET_OK to keep the connection open,
1861  *         GNUNET_SYSERR to close it (signal serious error)
1862  */
1863 static int
1864 handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1865                           struct GNUNET_MESH_Tunnel *tunnel,
1866                           const struct GNUNET_PeerIdentity *sender,
1867                           const struct GNUNET_MessageHeader *message,
1868                           const struct GNUNET_ATS_Information *atsi,
1869                           int operation)
1870 {
1871   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1872
1873   shutdown_handle = socket->shutdown_handle;
1874   if (NULL == shutdown_handle)
1875   {
1876     /* This may happen when the shudown handle is cancelled */
1877     LOG (GNUNET_ERROR_TYPE_DEBUG,
1878          "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1879          GNUNET_i2s (&socket->other_peer));
1880     return GNUNET_OK;
1881   }
1882   switch (operation)
1883   {
1884   case SHUT_RDWR:
1885     switch (socket->state)
1886     {
1887     case STATE_CLOSE_WAIT:
1888       if (SHUT_RDWR != shutdown_handle->operation)
1889       {
1890         LOG (GNUNET_ERROR_TYPE_DEBUG,
1891              "%s: Received CLOSE_ACK when shutdown handle is not for "
1892              "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1893         return GNUNET_OK;
1894       }
1895       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1896            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1897       socket->state = STATE_CLOSED;
1898       break;
1899     default:
1900       LOG (GNUNET_ERROR_TYPE_DEBUG,
1901            "%s: Received CLOSE_ACK when it is not expected\n",
1902            GNUNET_i2s (&socket->other_peer));
1903       return GNUNET_OK;
1904     }
1905     break;
1906   case SHUT_RD:
1907     switch (socket->state)
1908     {
1909     case STATE_RECEIVE_CLOSE_WAIT:
1910       if (SHUT_RD != shutdown_handle->operation)
1911       {
1912         LOG (GNUNET_ERROR_TYPE_DEBUG,
1913              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1914              "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1915         return GNUNET_OK;
1916       }
1917       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1918            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1919       socket->state = STATE_RECEIVE_CLOSED;
1920       break;
1921     default:
1922       LOG (GNUNET_ERROR_TYPE_DEBUG,
1923            "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1924            GNUNET_i2s (&socket->other_peer));
1925       return GNUNET_OK;
1926     }
1927     break;
1928   case SHUT_WR:
1929     switch (socket->state)
1930     {
1931     case STATE_TRANSMIT_CLOSE_WAIT:
1932       if (SHUT_WR != shutdown_handle->operation)
1933       {
1934         LOG (GNUNET_ERROR_TYPE_DEBUG,
1935              "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1936              "is not for SHUT_WR\n",
1937              GNUNET_i2s (&socket->other_peer));
1938         return GNUNET_OK;
1939       }
1940       LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1941            GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1942       socket->state = STATE_TRANSMIT_CLOSED;
1943       break;
1944     default:
1945       LOG (GNUNET_ERROR_TYPE_DEBUG,
1946            "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1947            GNUNET_i2s (&socket->other_peer));          
1948       return GNUNET_OK;
1949     }
1950     break;
1951   default:
1952     GNUNET_assert (0);
1953   }
1954   shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1955       (&call_cont_task, socket);
1956   if (GNUNET_SCHEDULER_NO_TASK
1957       != shutdown_handle->close_msg_retransmission_task_id)
1958   {
1959     GNUNET_SCHEDULER_cancel
1960       (shutdown_handle->close_msg_retransmission_task_id);
1961     shutdown_handle->close_msg_retransmission_task_id =
1962         GNUNET_SCHEDULER_NO_TASK;
1963   }
1964   return GNUNET_OK;
1965 }
1966
1967
1968 /**
1969  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1970  *
1971  * @param cls the socket (set from GNUNET_MESH_connect)
1972  * @param tunnel connection to the other end
1973  * @param tunnel_ctx this is NULL
1974  * @param sender who sent the message
1975  * @param message the actual message
1976  * @param atsi performance data for the connection
1977  * @return GNUNET_OK to keep the connection open,
1978  *         GNUNET_SYSERR to close it (signal serious error)
1979  */
1980 static int
1981 client_handle_transmit_close_ack (void *cls,
1982                                   struct GNUNET_MESH_Tunnel *tunnel,
1983                                   void **tunnel_ctx,
1984                                   const struct GNUNET_PeerIdentity *sender,
1985                                   const struct GNUNET_MessageHeader *message,
1986                                   const struct GNUNET_ATS_Information*atsi)
1987 {
1988   struct GNUNET_STREAM_Socket *socket = cls;
1989
1990   return handle_generic_close_ack (socket,
1991                                    tunnel,
1992                                    sender,
1993                                    (const struct GNUNET_MessageHeader *)
1994                                    message,
1995                                    atsi,
1996                                    SHUT_WR);
1997 }
1998
1999
2000 /**
2001  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2002  *
2003  * @param socket the socket
2004  * @param tunnel connection to the other end
2005  * @param sender who sent the message
2006  * @param message the actual message
2007  * @param atsi performance data for the connection
2008  * @return GNUNET_OK to keep the connection open,
2009  *         GNUNET_SYSERR to close it (signal serious error)
2010  */
2011 static int
2012 handle_receive_close (struct GNUNET_STREAM_Socket *socket,
2013                       struct GNUNET_MESH_Tunnel *tunnel,
2014                       const struct GNUNET_PeerIdentity *sender,
2015                       const struct GNUNET_MessageHeader *message,
2016                       const struct GNUNET_ATS_Information *atsi)
2017 {
2018   struct GNUNET_MessageHeader *receive_close_ack;
2019
2020   switch (socket->state)
2021   {
2022   case STATE_INIT:
2023   case STATE_LISTEN:
2024   case STATE_HELLO_WAIT:
2025     LOG (GNUNET_ERROR_TYPE_DEBUG,
2026          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2027          GNUNET_i2s (&socket->other_peer));
2028     return GNUNET_OK;
2029   default:
2030     break;
2031   }  
2032   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
2033        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));  
2034   receive_close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
2035   receive_close_ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2036   receive_close_ack->type =
2037       htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
2038   queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
2039   switch (socket->state)
2040   {
2041   case STATE_TRANSMIT_CLOSED:
2042   case STATE_TRANSMIT_CLOSE_WAIT:
2043   case STATE_CLOSED:
2044   case STATE_CLOSE_WAIT:
2045     return GNUNET_OK;
2046   default:
2047     break;
2048   }
2049   do_transmit_shutdown (socket);
2050   if (GNUNET_YES == socket->receive_closed)
2051     socket->state = STATE_CLOSED;
2052   else
2053     socket->state = STATE_TRANSMIT_CLOSED;
2054   return GNUNET_OK;
2055 }
2056
2057
2058 /**
2059  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2060  *
2061  * @param cls the socket (set from GNUNET_MESH_connect)
2062  * @param tunnel connection to the other end
2063  * @param tunnel_ctx this is NULL
2064  * @param sender who sent the message
2065  * @param message the actual message
2066  * @param atsi performance data for the connection
2067  * @return GNUNET_OK to keep the connection open,
2068  *         GNUNET_SYSERR to close it (signal serious error)
2069  */
2070 static int
2071 client_handle_receive_close (void *cls,
2072                              struct GNUNET_MESH_Tunnel *tunnel,
2073                              void **tunnel_ctx,
2074                              const struct GNUNET_PeerIdentity *sender,
2075                              const struct GNUNET_MessageHeader *message,
2076                              const struct GNUNET_ATS_Information*atsi)
2077 {
2078   struct GNUNET_STREAM_Socket *socket = cls;
2079
2080   return
2081     handle_receive_close (socket,
2082                           tunnel,
2083                           sender,
2084                           (const struct GNUNET_MessageHeader *) message,
2085                           atsi);
2086 }
2087
2088
2089 /**
2090  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2091  *
2092  * @param cls the socket (set from GNUNET_MESH_connect)
2093  * @param tunnel connection to the other end
2094  * @param tunnel_ctx this is NULL
2095  * @param sender who sent the message
2096  * @param message the actual message
2097  * @param atsi performance data for the connection
2098  * @return GNUNET_OK to keep the connection open,
2099  *         GNUNET_SYSERR to close it (signal serious error)
2100  */
2101 static int
2102 client_handle_receive_close_ack (void *cls,
2103                                  struct GNUNET_MESH_Tunnel *tunnel,
2104                                  void **tunnel_ctx,
2105                                  const struct GNUNET_PeerIdentity *sender,
2106                                  const struct GNUNET_MessageHeader *message,
2107                                  const struct GNUNET_ATS_Information*atsi)
2108 {
2109   struct GNUNET_STREAM_Socket *socket = cls;
2110
2111   return handle_generic_close_ack (socket,
2112                                    tunnel,
2113                                    sender,
2114                                    (const struct GNUNET_MessageHeader *)
2115                                    message,
2116                                    atsi,
2117                                    SHUT_RD);
2118 }
2119
2120
2121 /**
2122  * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2123  *
2124  * @param socket the socket
2125  * @param tunnel connection to the other end
2126  * @param sender who sent the message
2127  * @param message the actual message
2128  * @param atsi performance data for the connection
2129  * @return GNUNET_OK to keep the connection open,
2130  *         GNUNET_SYSERR to close it (signal serious error)
2131  */
2132 static int
2133 handle_close (struct GNUNET_STREAM_Socket *socket,
2134               struct GNUNET_MESH_Tunnel *tunnel,
2135               const struct GNUNET_PeerIdentity *sender,
2136               const struct GNUNET_MessageHeader *message,
2137               const struct GNUNET_ATS_Information*atsi)
2138 {
2139   struct GNUNET_MessageHeader *close_ack;
2140
2141   switch (socket->state)
2142   {
2143   case STATE_INIT:
2144   case STATE_LISTEN:
2145   case STATE_HELLO_WAIT:
2146     LOG (GNUNET_ERROR_TYPE_DEBUG,
2147          "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2148          GNUNET_i2s (&socket->other_peer));
2149     return GNUNET_OK;
2150   default:
2151     break;
2152   }
2153   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2154        GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2155   close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
2156   close_ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2157   close_ack->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2158   queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2159   if ((STATE_CLOSED == socket->state) || (STATE_CLOSE_WAIT == socket->state))
2160     return GNUNET_OK;
2161   if (GNUNET_NO == socket->transmit_closed)
2162     do_transmit_shutdown (socket);
2163   if (GNUNET_NO == socket->receive_closed)
2164     do_receive_shutdown (socket);
2165   return GNUNET_OK;
2166 }
2167
2168
2169 /**
2170  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2171  *
2172  * @param cls the socket (set from GNUNET_MESH_connect)
2173  * @param tunnel connection to the other end
2174  * @param tunnel_ctx this is NULL
2175  * @param sender who sent the message
2176  * @param message the actual message
2177  * @param atsi performance data for the connection
2178  * @return GNUNET_OK to keep the connection open,
2179  *         GNUNET_SYSERR to close it (signal serious error)
2180  */
2181 static int
2182 client_handle_close (void *cls,
2183                      struct GNUNET_MESH_Tunnel *tunnel,
2184                      void **tunnel_ctx,
2185                      const struct GNUNET_PeerIdentity *sender,
2186                      const struct GNUNET_MessageHeader *message,
2187                      const struct GNUNET_ATS_Information*atsi)
2188 {
2189   struct GNUNET_STREAM_Socket *socket = cls;
2190
2191   return handle_close (socket,
2192                        tunnel,
2193                        sender,
2194                        (const struct GNUNET_MessageHeader *) message,
2195                        atsi);
2196 }
2197
2198
2199 /**
2200  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2201  *
2202  * @param cls the socket (set from GNUNET_MESH_connect)
2203  * @param tunnel connection to the other end
2204  * @param tunnel_ctx this is NULL
2205  * @param sender who sent the message
2206  * @param message the actual message
2207  * @param atsi performance data for the connection
2208  * @return GNUNET_OK to keep the connection open,
2209  *         GNUNET_SYSERR to close it (signal serious error)
2210  */
2211 static int
2212 client_handle_close_ack (void *cls,
2213                          struct GNUNET_MESH_Tunnel *tunnel,
2214                          void **tunnel_ctx,
2215                          const struct GNUNET_PeerIdentity *sender,
2216                          const struct GNUNET_MessageHeader *message,
2217                          const struct GNUNET_ATS_Information *atsi)
2218 {
2219   struct GNUNET_STREAM_Socket *socket = cls;
2220
2221   return handle_generic_close_ack (socket,
2222                                    tunnel,
2223                                    sender,
2224                                    (const struct GNUNET_MessageHeader *) 
2225                                    message,
2226                                    atsi,
2227                                    SHUT_RDWR);
2228 }
2229
2230 /*****************************/
2231 /* Server's Message Handlers */
2232 /*****************************/
2233
2234 /**
2235  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2236  *
2237  * @param cls the closure
2238  * @param tunnel connection to the other end
2239  * @param tunnel_ctx the socket
2240  * @param sender who sent the message
2241  * @param message the actual message
2242  * @param atsi performance data for the connection
2243  * @return GNUNET_OK to keep the connection open,
2244  *         GNUNET_SYSERR to close it (signal serious error)
2245  */
2246 static int
2247 server_handle_data (void *cls,
2248                     struct GNUNET_MESH_Tunnel *tunnel,
2249                     void **tunnel_ctx,
2250                     const struct GNUNET_PeerIdentity *sender,
2251                     const struct GNUNET_MessageHeader *message,
2252                     const struct GNUNET_ATS_Information*atsi)
2253 {
2254   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2255
2256   return handle_data (socket,
2257                       tunnel,
2258                       sender,
2259                       (const struct GNUNET_STREAM_DataMessage *)message,
2260                       atsi);
2261 }
2262
2263
2264 /**
2265  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2266  *
2267  * @param cls the closure
2268  * @param tunnel connection to the other end
2269  * @param tunnel_ctx the socket
2270  * @param sender who sent the message
2271  * @param message the actual message
2272  * @param atsi performance data for the connection
2273  * @return GNUNET_OK to keep the connection open,
2274  *         GNUNET_SYSERR to close it (signal serious error)
2275  */
2276 static int
2277 server_handle_hello (void *cls,
2278                      struct GNUNET_MESH_Tunnel *tunnel,
2279                      void **tunnel_ctx,
2280                      const struct GNUNET_PeerIdentity *sender,
2281                      const struct GNUNET_MessageHeader *message,
2282                      const struct GNUNET_ATS_Information*atsi)
2283 {
2284   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2285   const struct GNUNET_STREAM_HelloMessage *hello;
2286   struct GNUNET_STREAM_HelloAckMessage *reply;
2287   uint32_t port;
2288
2289   hello = (const struct GNUNET_STREAM_HelloMessage *) message;
2290   if (0 != memcmp (sender,
2291                    &socket->other_peer,
2292                    sizeof (struct GNUNET_PeerIdentity)))
2293   {
2294     LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
2295                GNUNET_i2s (&socket->other_peer));
2296     return GNUNET_YES;
2297   }
2298   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2299   GNUNET_assert (socket->tunnel == tunnel);
2300   LOG_DEBUG ("%1$s: Received HELLO from %1$s\n", 
2301              GNUNET_i2s (&socket->other_peer));
2302   port = (uint32_t) GNUNET_ntohll (hello->port);
2303   switch (socket->state)
2304   {
2305   case STATE_INIT:
2306     if (port != socket->port)
2307     {
2308       LOG_DEBUG ("Ignoring HELLO for port %u\n", port);
2309       GNUNET_MESH_tunnel_destroy (tunnel);
2310       GNUNET_free (socket);
2311       return GNUNET_OK;
2312     }
2313     reply = generate_hello_ack (socket, GNUNET_YES);
2314     queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2315                    GNUNET_NO);
2316     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2317                    socket->control_retransmission_task_id);
2318     socket->control_retransmission_task_id =
2319       GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2320                                     &control_retransmission_task, socket);
2321     break;
2322   case STATE_HELLO_WAIT:
2323     /* Perhaps our HELLO_ACK was lost */
2324     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
2325                    socket->control_retransmission_task_id);
2326     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2327     socket->control_retransmission_task_id =
2328       GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2329     break;
2330   default:
2331     LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2332                GNUNET_i2s (&socket->other_peer), socket->state);
2333     /* FIXME: Send RESET? */
2334   }
2335   return GNUNET_OK;
2336 }
2337
2338
2339 /**
2340  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2341  *
2342  * @param cls the closure
2343  * @param tunnel connection to the other end
2344  * @param tunnel_ctx the socket
2345  * @param sender who sent the message
2346  * @param message the actual message
2347  * @param atsi performance data for the connection
2348  * @return GNUNET_OK to keep the connection open,
2349  *         GNUNET_SYSERR to close it (signal serious error)
2350  */
2351 static int
2352 server_handle_hello_ack (void *cls,
2353                          struct GNUNET_MESH_Tunnel *tunnel,
2354                          void **tunnel_ctx,
2355                          const struct GNUNET_PeerIdentity *sender,
2356                          const struct GNUNET_MessageHeader *message,
2357                          const struct GNUNET_ATS_Information*atsi)
2358 {
2359   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2360   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2361
2362   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2363                  ntohs (message->type));
2364   GNUNET_assert (socket->tunnel == tunnel);
2365   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2366   switch (socket->state)  
2367   {
2368   case STATE_HELLO_WAIT:
2369     LOG (GNUNET_ERROR_TYPE_DEBUG,
2370          "%s: Received HELLO_ACK from %s\n",
2371          GNUNET_i2s (&socket->other_peer),
2372          GNUNET_i2s (&socket->other_peer));
2373     socket->read_sequence_number = ntohl (ack_message->sequence_number);
2374     LOG (GNUNET_ERROR_TYPE_DEBUG,
2375          "%s: Read sequence number %u\n",
2376          GNUNET_i2s (&socket->other_peer),
2377          (unsigned int) socket->read_sequence_number);
2378     socket->receiver_window_available = 
2379       ntohl (ack_message->receiver_window_size);
2380     set_state_established (NULL, socket);
2381     break;
2382   default:
2383     LOG (GNUNET_ERROR_TYPE_DEBUG,
2384          "Client sent HELLO_ACK when in state %d\n", socket->state);    
2385   }
2386   return GNUNET_OK;
2387 }
2388
2389
2390 /**
2391  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2392  *
2393  * @param cls the closure
2394  * @param tunnel connection to the other end
2395  * @param tunnel_ctx the socket
2396  * @param sender who sent the message
2397  * @param message the actual message
2398  * @param atsi performance data for the connection
2399  * @return GNUNET_OK to keep the connection open,
2400  *         GNUNET_SYSERR to close it (signal serious error)
2401  */
2402 static int
2403 server_handle_reset (void *cls,
2404                      struct GNUNET_MESH_Tunnel *tunnel,
2405                      void **tunnel_ctx,
2406                      const struct GNUNET_PeerIdentity *sender,
2407                      const struct GNUNET_MessageHeader *message,
2408                      const struct GNUNET_ATS_Information*atsi)
2409 {
2410   // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2411   /* FIXME */
2412   return GNUNET_OK;
2413 }
2414
2415
2416 /**
2417  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2418  *
2419  * @param cls the closure
2420  * @param tunnel connection to the other end
2421  * @param tunnel_ctx the socket
2422  * @param sender who sent the message
2423  * @param message the actual message
2424  * @param atsi performance data for the connection
2425  * @return GNUNET_OK to keep the connection open,
2426  *         GNUNET_SYSERR to close it (signal serious error)
2427  */
2428 static int
2429 server_handle_transmit_close (void *cls,
2430                               struct GNUNET_MESH_Tunnel *tunnel,
2431                               void **tunnel_ctx,
2432                               const struct GNUNET_PeerIdentity *sender,
2433                               const struct GNUNET_MessageHeader *message,
2434                               const struct GNUNET_ATS_Information*atsi)
2435 {
2436   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2437
2438   return handle_transmit_close (socket, tunnel, sender, message, atsi);
2439 }
2440
2441
2442 /**
2443  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2444  *
2445  * @param cls the closure
2446  * @param tunnel connection to the other end
2447  * @param tunnel_ctx the socket
2448  * @param sender who sent the message
2449  * @param message the actual message
2450  * @param atsi performance data for the connection
2451  * @return GNUNET_OK to keep the connection open,
2452  *         GNUNET_SYSERR to close it (signal serious error)
2453  */
2454 static int
2455 server_handle_transmit_close_ack (void *cls,
2456                                   struct GNUNET_MESH_Tunnel *tunnel,
2457                                   void **tunnel_ctx,
2458                                   const struct GNUNET_PeerIdentity *sender,
2459                                   const struct GNUNET_MessageHeader *message,
2460                                   const struct GNUNET_ATS_Information*atsi)
2461 {
2462   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2463
2464   return handle_generic_close_ack (socket, tunnel, sender, message, atsi,
2465                                    SHUT_WR);
2466 }
2467
2468
2469 /**
2470  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2471  *
2472  * @param cls the closure
2473  * @param tunnel connection to the other end
2474  * @param tunnel_ctx the socket
2475  * @param sender who sent the message
2476  * @param message the actual message
2477  * @param atsi performance data for the connection
2478  * @return GNUNET_OK to keep the connection open,
2479  *         GNUNET_SYSERR to close it (signal serious error)
2480  */
2481 static int
2482 server_handle_receive_close (void *cls,
2483                              struct GNUNET_MESH_Tunnel *tunnel,
2484                              void **tunnel_ctx,
2485                              const struct GNUNET_PeerIdentity *sender,
2486                              const struct GNUNET_MessageHeader *message,
2487                              const struct GNUNET_ATS_Information*atsi)
2488 {
2489   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2490
2491   return handle_receive_close (socket, tunnel, sender, message, atsi);
2492 }
2493
2494
2495 /**
2496  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2497  *
2498  * @param cls the closure
2499  * @param tunnel connection to the other end
2500  * @param tunnel_ctx the socket
2501  * @param sender who sent the message
2502  * @param message the actual message
2503  * @param atsi performance data for the connection
2504  * @return GNUNET_OK to keep the connection open,
2505  *         GNUNET_SYSERR to close it (signal serious error)
2506  */
2507 static int
2508 server_handle_receive_close_ack (void *cls,
2509                                  struct GNUNET_MESH_Tunnel *tunnel,
2510                                  void **tunnel_ctx,
2511                                  const struct GNUNET_PeerIdentity *sender,
2512                                  const struct GNUNET_MessageHeader *message,
2513                                  const struct GNUNET_ATS_Information*atsi)
2514 {
2515   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2516
2517   return handle_generic_close_ack (socket, tunnel, sender, message, atsi,
2518                                    SHUT_RD);
2519 }
2520
2521
2522 /**
2523  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2524  *
2525  * @param cls the listen socket (from GNUNET_MESH_connect in
2526  *          GNUNET_STREAM_listen) 
2527  * @param tunnel connection to the other end
2528  * @param tunnel_ctx the socket
2529  * @param sender who sent the message
2530  * @param message the actual message
2531  * @param atsi performance data for the connection
2532  * @return GNUNET_OK to keep the connection open,
2533  *         GNUNET_SYSERR to close it (signal serious error)
2534  */
2535 static int
2536 server_handle_close (void *cls,
2537                      struct GNUNET_MESH_Tunnel *tunnel,
2538                      void **tunnel_ctx,
2539                      const struct GNUNET_PeerIdentity *sender,
2540                      const struct GNUNET_MessageHeader *message,
2541                      const struct GNUNET_ATS_Information*atsi)
2542 {
2543   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2544   
2545   return handle_close (socket, tunnel, sender, message, atsi);
2546 }
2547
2548
2549 /**
2550  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2551  *
2552  * @param cls the closure
2553  * @param tunnel connection to the other end
2554  * @param tunnel_ctx the socket
2555  * @param sender who sent the message
2556  * @param message the actual message
2557  * @param atsi performance data for the connection
2558  * @return GNUNET_OK to keep the connection open,
2559  *         GNUNET_SYSERR to close it (signal serious error)
2560  */
2561 static int
2562 server_handle_close_ack (void *cls,
2563                          struct GNUNET_MESH_Tunnel *tunnel,
2564                          void **tunnel_ctx,
2565                          const struct GNUNET_PeerIdentity *sender,
2566                          const struct GNUNET_MessageHeader *message,
2567                          const struct GNUNET_ATS_Information*atsi)
2568 {
2569   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2570
2571   return handle_generic_close_ack (socket, tunnel, sender, message, atsi,
2572                                    SHUT_RDWR);
2573 }
2574
2575
2576 /**
2577  * Handler for DATA_ACK messages
2578  *
2579  * @param socket the socket through which the ack was received
2580  * @param tunnel connection to the other end
2581  * @param sender who sent the message
2582  * @param ack the acknowledgment message
2583  * @param atsi performance data for the connection
2584  * @return GNUNET_OK to keep the connection open,
2585  *         GNUNET_SYSERR to close it (signal serious error)
2586  */
2587 static int
2588 handle_ack (struct GNUNET_STREAM_Socket *socket,
2589             struct GNUNET_MESH_Tunnel *tunnel,
2590             const struct GNUNET_PeerIdentity *sender,
2591             const struct GNUNET_STREAM_AckMessage *ack,
2592             const struct GNUNET_ATS_Information*atsi)
2593 {
2594   struct GNUNET_STREAM_WriteHandle *write_handle;
2595   uint64_t ack_bitmap;
2596   unsigned int packet;
2597   int need_retransmission;
2598   uint32_t sequence_difference;
2599
2600   if (0 != memcmp (sender,
2601                    &socket->other_peer,
2602                    sizeof (struct GNUNET_PeerIdentity)))
2603   {
2604     LOG (GNUNET_ERROR_TYPE_DEBUG,
2605          "%s: Received ACK from non-confirming peer\n",
2606          GNUNET_i2s (&socket->other_peer));
2607     return GNUNET_YES;
2608   }
2609   switch (socket->state)
2610   {
2611   case (STATE_ESTABLISHED):
2612   case (STATE_RECEIVE_CLOSED):
2613   case (STATE_RECEIVE_CLOSE_WAIT):
2614     if (NULL == socket->write_handle)
2615     {
2616       LOG (GNUNET_ERROR_TYPE_DEBUG,
2617            "%s: Received DATA_ACK when write_handle is NULL\n",
2618            GNUNET_i2s (&socket->other_peer));
2619       return GNUNET_OK;
2620     }
2621     sequence_difference = 
2622         socket->write_sequence_number - ntohl (ack->base_sequence_number);
2623     if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2624     {
2625       LOG (GNUNET_ERROR_TYPE_DEBUG,
2626            "%s: Received DATA_ACK with unexpected base sequence number\n",
2627            GNUNET_i2s (&socket->other_peer));
2628       LOG (GNUNET_ERROR_TYPE_DEBUG,
2629            "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2630            GNUNET_i2s (&socket->other_peer),
2631            socket->write_sequence_number,
2632            ntohl (ack->base_sequence_number));
2633       return GNUNET_OK;
2634     }
2635     LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2636          GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2637     /* Cancel the retransmission task */
2638     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2639     {
2640       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2641       socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2642       socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
2643     }
2644     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2645     {
2646       if (NULL == socket->write_handle->messages[packet])
2647         break;
2648       /* BS: Base sequence from ack; PS: sequence num of current packet */
2649       sequence_difference = ntohl (ack->base_sequence_number)
2650         - ntohl (socket->write_handle->messages[packet]->sequence_number);
2651       if (0 == sequence_difference)
2652         break; /* The message in our handle is not yet received */
2653       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2654       /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2655       ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2656                             packet, GNUNET_YES);
2657     }
2658     if (((ntohl (ack->base_sequence_number)
2659          - (socket->write_handle->max_ack_base_num))
2660           <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2661     {
2662       socket->write_handle->max_ack_base_num = ntohl (ack->base_sequence_number);
2663       socket->receiver_window_available = 
2664           ntohl (ack->receive_window_remaining);
2665     }
2666     else
2667       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2668                   "Ignoring to modify receive window available as base: %u, max_ack_base: %u\n",
2669                   ntohl (ack->base_sequence_number),
2670                    socket->write_handle->max_ack_base_num);
2671     if ((packet == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2672         || ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2673             && (NULL == socket->write_handle->messages[packet])))
2674       goto call_write_cont_cb;
2675     GNUNET_assert (ntohl
2676                    (socket->write_handle->messages[packet]->sequence_number)
2677                    == ntohl (ack->base_sequence_number));
2678     /* Update our bitmap */
2679     ack_bitmap = GNUNET_ntohll (ack->bitmap);
2680     for (; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2681     {
2682       if (NULL == socket->write_handle->messages[packet]) break;
2683       if (ackbitmap_is_bit_set (&ack_bitmap, ntohl
2684                                 (socket->write_handle->messages[packet]->sequence_number)
2685                                 - ntohl (ack->base_sequence_number)))
2686         ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet, GNUNET_YES);
2687     }
2688     /* Check if we have received all acknowledgements */
2689     need_retransmission = GNUNET_NO;
2690     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2691     {
2692       if (NULL == socket->write_handle->messages[packet]) break;
2693       if (GNUNET_YES != ackbitmap_is_bit_set 
2694           (&socket->write_handle->ack_bitmap,packet))
2695       {
2696         need_retransmission = GNUNET_YES;
2697         break;
2698       }
2699     }
2700     if (GNUNET_YES == need_retransmission)
2701     {
2702       write_data (socket);
2703       return GNUNET_OK;
2704     }
2705
2706   call_write_cont_cb:
2707     /* Free the packets */
2708     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2709     {
2710       GNUNET_free_non_null (socket->write_handle->messages[packet]);
2711     }
2712     write_handle = socket->write_handle;
2713     socket->write_handle = NULL;
2714     if (NULL != write_handle->write_cont)
2715       write_handle->write_cont (write_handle->write_cont_cls,
2716                                 GNUNET_STREAM_OK,
2717                                 write_handle->size);
2718     /* We are done with the write handle - Freeing it */
2719     GNUNET_free (write_handle);
2720     LOG (GNUNET_ERROR_TYPE_DEBUG,
2721          "%s: Write completion callback completed\n",
2722          GNUNET_i2s (&socket->other_peer));      
2723     break;
2724   default:
2725     break;
2726   }
2727   return GNUNET_OK;
2728 }
2729
2730
2731 /**
2732  * Handler for DATA_ACK messages
2733  *
2734  * @param cls the 'struct GNUNET_STREAM_Socket'
2735  * @param tunnel connection to the other end
2736  * @param tunnel_ctx unused
2737  * @param sender who sent the message
2738  * @param message the actual message
2739  * @param atsi performance data for the connection
2740  * @return GNUNET_OK to keep the connection open,
2741  *         GNUNET_SYSERR to close it (signal serious error)
2742  */
2743 static int
2744 client_handle_ack (void *cls,
2745                    struct GNUNET_MESH_Tunnel *tunnel,
2746                    void **tunnel_ctx,
2747                    const struct GNUNET_PeerIdentity *sender,
2748                    const struct GNUNET_MessageHeader *message,
2749                    const struct GNUNET_ATS_Information*atsi)
2750 {
2751   struct GNUNET_STREAM_Socket *socket = cls;
2752   const struct GNUNET_STREAM_AckMessage *ack;
2753
2754   ack = (const struct GNUNET_STREAM_AckMessage *) message; 
2755   return handle_ack (socket, tunnel, sender, ack, atsi);
2756 }
2757
2758
2759 /**
2760  * Handler for DATA_ACK messages
2761  *
2762  * @param cls the server's listen socket
2763  * @param tunnel connection to the other end
2764  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2765  * @param sender who sent the message
2766  * @param message the actual message
2767  * @param atsi performance data for the connection
2768  * @return GNUNET_OK to keep the connection open,
2769  *         GNUNET_SYSERR to close it (signal serious error)
2770  */
2771 static int
2772 server_handle_ack (void *cls,
2773                    struct GNUNET_MESH_Tunnel *tunnel,
2774                    void **tunnel_ctx,
2775                    const struct GNUNET_PeerIdentity *sender,
2776                    const struct GNUNET_MessageHeader *message,
2777                    const struct GNUNET_ATS_Information*atsi)
2778 {
2779   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2780   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2781  
2782   return handle_ack (socket, tunnel, sender, ack, atsi);
2783 }
2784
2785
2786 /**
2787  * For client message handlers, the stream socket is in the
2788  * closure argument.
2789  */
2790 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2791   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2792   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2793    sizeof (struct GNUNET_STREAM_AckMessage) },
2794   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2795    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2796   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2797    sizeof (struct GNUNET_MessageHeader)},
2798   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2799    sizeof (struct GNUNET_MessageHeader)},
2800   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2801    sizeof (struct GNUNET_MessageHeader)},
2802   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2803    sizeof (struct GNUNET_MessageHeader)},
2804   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2805    sizeof (struct GNUNET_MessageHeader)},
2806   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2807    sizeof (struct GNUNET_MessageHeader)},
2808   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2809    sizeof (struct GNUNET_MessageHeader)},
2810   {NULL, 0, 0}
2811 };
2812
2813
2814 /**
2815  * For server message handlers, the stream socket is in the
2816  * tunnel context, and the listen socket in the closure argument.
2817  */
2818 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2819   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2820   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2821    sizeof (struct GNUNET_STREAM_AckMessage) },
2822   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2823    sizeof (struct GNUNET_STREAM_HelloMessage)},
2824   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2825    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2826   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2827    sizeof (struct GNUNET_MessageHeader)},
2828   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2829    sizeof (struct GNUNET_MessageHeader)},
2830   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2831    sizeof (struct GNUNET_MessageHeader)},
2832   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2833    sizeof (struct GNUNET_MessageHeader)},
2834   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2835    sizeof (struct GNUNET_MessageHeader)},
2836   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2837    sizeof (struct GNUNET_MessageHeader)},
2838   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2839    sizeof (struct GNUNET_MessageHeader)},
2840   {NULL, 0, 0}
2841 };
2842
2843
2844 /**
2845  * Function called when our target peer is connected to our tunnel
2846  *
2847  * @param cls the socket for which this tunnel is created
2848  * @param peer the peer identity of the target
2849  * @param atsi performance data for the connection
2850  */
2851 static void
2852 mesh_peer_connect_callback (void *cls,
2853                             const struct GNUNET_PeerIdentity *peer,
2854                             const struct GNUNET_ATS_Information * atsi)
2855 {
2856   struct GNUNET_STREAM_Socket *socket = cls;
2857   struct GNUNET_MessageHeader *message;
2858   
2859   if (0 != memcmp (peer,
2860                    &socket->other_peer,
2861                    sizeof (struct GNUNET_PeerIdentity)))
2862   {
2863     LOG (GNUNET_ERROR_TYPE_DEBUG,
2864          "%s: A peer which is not our target has connected to our tunnel\n",
2865          GNUNET_i2s(peer));
2866     return;
2867   }
2868   LOG (GNUNET_ERROR_TYPE_DEBUG,
2869        "%s: Target peer %s connected\n",
2870        GNUNET_i2s (&socket->other_peer),
2871        GNUNET_i2s (&socket->other_peer));
2872   /* Set state to INIT */
2873   socket->state = STATE_INIT;
2874   /* Send HELLO message */
2875   message = generate_hello (socket);
2876   queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2877   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2878     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2879   socket->control_retransmission_task_id =
2880     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2881                                   &control_retransmission_task, socket);
2882 }
2883
2884
2885 /**
2886  * Function called when our target peer is disconnected from our tunnel
2887  *
2888  * @param cls the socket associated which this tunnel
2889  * @param peer the peer identity of the target
2890  */
2891 static void
2892 mesh_peer_disconnect_callback (void *cls,
2893                                const struct GNUNET_PeerIdentity *peer)
2894 {
2895   struct GNUNET_STREAM_Socket *socket=cls;
2896   
2897   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2898   LOG_DEBUG ("%1$s: Other peer %1$s disconnected \n",
2899              GNUNET_i2s (&socket->other_peer));
2900 }
2901
2902
2903 /**
2904  * Method called whenever a peer creates a tunnel to us
2905  *
2906  * @param cls closure
2907  * @param tunnel new handle to the tunnel
2908  * @param initiator peer that started the tunnel
2909  * @param atsi performance information for the tunnel
2910  * @return initial tunnel context for the tunnel
2911  *         (can be NULL -- that's not an error)
2912  */
2913 static void *
2914 new_tunnel_notify (void *cls,
2915                    struct GNUNET_MESH_Tunnel *tunnel,
2916                    const struct GNUNET_PeerIdentity *initiator,
2917                    const struct GNUNET_ATS_Information *atsi)
2918 {
2919   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2920   struct GNUNET_STREAM_Socket *socket;
2921
2922   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2923      from the same peer again until the socket is closed */
2924   if (GNUNET_NO == lsocket->listening)
2925   {
2926     GNUNET_MESH_tunnel_destroy (tunnel);
2927     return NULL;
2928   }
2929   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2930   socket->other_peer = *initiator;
2931   socket->tunnel = tunnel;
2932   socket->state = STATE_INIT;
2933   socket->lsocket = lsocket;
2934   socket->port = lsocket->port;
2935   socket->stat_handle = lsocket->stat_handle;
2936   socket->retransmit_timeout = lsocket->retransmit_timeout;
2937   socket->testing_active = lsocket->testing_active;
2938   socket->testing_set_write_sequence_number_value =
2939       lsocket->testing_set_write_sequence_number_value;
2940   socket->max_payload_size = lsocket->max_payload_size;
2941   LOG_DEBUG ("%1$s: Peer %1$s initiated tunnel to us\n",
2942              GNUNET_i2s (&socket->other_peer));
2943   if (NULL != socket->stat_handle)
2944   {
2945     GNUNET_STATISTICS_update (socket->stat_handle,
2946                               "total inbound connections received",
2947                               1, GNUNET_NO);
2948     GNUNET_STATISTICS_update (socket->stat_handle,
2949                               "inbound connections", 1, GNUNET_NO);
2950   }
2951   return socket;
2952 }
2953
2954
2955 /**
2956  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2957  * any associated state.  This function is NOT called if the client has
2958  * explicitly asked for the tunnel to be destroyed using
2959  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2960  * the tunnel.
2961  *
2962  * @param cls closure (set from GNUNET_MESH_connect)
2963  * @param tunnel connection to the other end (henceforth invalid)
2964  * @param tunnel_ctx place where local state associated
2965  *                   with the tunnel is stored
2966  */
2967 static void 
2968 tunnel_cleaner (void *cls,
2969                 const struct GNUNET_MESH_Tunnel *tunnel,
2970                 void *tunnel_ctx)
2971 {
2972   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2973   struct MessageQueue *head;
2974
2975   GNUNET_assert (tunnel == socket->tunnel);
2976   GNUNET_break_op(0);
2977   LOG (GNUNET_ERROR_TYPE_DEBUG,
2978        "%s: Peer %s has terminated connection abruptly\n",
2979        GNUNET_i2s (&socket->other_peer),
2980        GNUNET_i2s (&socket->other_peer));
2981   if (NULL != socket->stat_handle)
2982   {
2983     GNUNET_STATISTICS_update (socket->stat_handle,
2984                               "connections terminated abruptly", 1, GNUNET_NO);
2985     GNUNET_STATISTICS_update (socket->stat_handle,
2986                               "inbound connections", -1, GNUNET_NO);
2987   }
2988   /* Clear Transmit handles */
2989   if (NULL != socket->transmit_handle)
2990   {
2991     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2992     socket->transmit_handle = NULL;
2993   }
2994   /* Stop Tasks using socket->tunnel */
2995   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2996   {
2997     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2998     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2999   }
3000   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3001   {
3002     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3003     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3004   }  
3005   /* Terminate the control retransmission tasks */
3006   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3007   {
3008     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3009     socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3010   }
3011   /* Clear existing message queue */
3012   while (NULL != (head = socket->queue_head)) {
3013     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3014                                  socket->queue_tail,
3015                                  head);
3016     GNUNET_free (head->message);
3017     GNUNET_free (head);
3018   }
3019   socket->tunnel = NULL;
3020 }
3021
3022
3023 /**
3024  * Callback to signal timeout on lockmanager lock acquire
3025  *
3026  * @param cls the ListenSocket
3027  * @param tc the scheduler task context
3028  */
3029 static void
3030 lockmanager_acquire_timeout (void *cls, 
3031                              const struct GNUNET_SCHEDULER_TaskContext *tc)
3032 {
3033   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
3034   GNUNET_STREAM_ListenCallback listen_cb;
3035   void *listen_cb_cls;
3036
3037   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
3038   listen_cb = lsocket->listen_cb;
3039   listen_cb_cls = lsocket->listen_cb_cls;
3040   if (NULL != listen_cb)
3041     listen_cb (listen_cb_cls, NULL, NULL);
3042 }
3043
3044
3045 /**
3046  * Callback to notify us on the status changes on app_port lock
3047  *
3048  * @param cls the ListenSocket
3049  * @param domain the domain name of the lock
3050  * @param lock the app_port
3051  * @param status the current status of the lock
3052  */
3053 static void
3054 lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
3055                        enum GNUNET_LOCKMANAGER_Status status)
3056 {
3057   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
3058
3059   GNUNET_assert (lock == (uint32_t) lsocket->port);
3060   if (GNUNET_LOCKMANAGER_SUCCESS == status)
3061   {
3062     lsocket->listening = GNUNET_YES;
3063     if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3064     {
3065       GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3066       lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
3067     }
3068     if (NULL == lsocket->mesh)
3069     {
3070       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
3071
3072       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
3073                                            lsocket, /* Closure */
3074                                            &new_tunnel_notify,
3075                                            &tunnel_cleaner,
3076                                            server_message_handlers,
3077                                            ports);
3078       GNUNET_assert (NULL != lsocket->mesh);
3079       if (NULL != lsocket->listen_ok_cb)
3080       {
3081         (void) lsocket->listen_ok_cb ();
3082       }
3083     }
3084   }
3085   if (GNUNET_LOCKMANAGER_RELEASE == status)
3086     lsocket->listening = GNUNET_NO;
3087 }
3088
3089
3090 /*****************/
3091 /* API functions */
3092 /*****************/
3093
3094
3095 /**
3096  * Tries to open a stream to the target peer
3097  *
3098  * @param cfg configuration to use
3099  * @param target the target peer to which the stream has to be opened
3100  * @param app_port the application port number which uniquely identifies this
3101  *            stream
3102  * @param open_cb this function will be called after stream has be established;
3103  *          cannot be NULL
3104  * @param open_cb_cls the closure for open_cb
3105  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3106  * @return if successful it returns the stream socket; NULL if stream cannot be
3107  *         opened 
3108  */
3109 struct GNUNET_STREAM_Socket *
3110 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
3111                     const struct GNUNET_PeerIdentity *target,
3112                     GNUNET_MESH_ApplicationType app_port,
3113                     GNUNET_STREAM_OpenCallback open_cb,
3114                     void *open_cb_cls,
3115                     ...)
3116 {
3117   struct GNUNET_STREAM_Socket *socket;
3118   enum GNUNET_STREAM_Option option;
3119   va_list vargs;
3120   uint16_t payload_size;
3121
3122   LOG (GNUNET_ERROR_TYPE_DEBUG,
3123        "%s\n", __func__);
3124   GNUNET_assert (NULL != open_cb);
3125   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
3126   socket->other_peer = *target;
3127   socket->open_cb = open_cb;
3128   socket->open_cls = open_cb_cls;
3129   socket->port = app_port;
3130   /* Set defaults */
3131   socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3132   socket->testing_active = GNUNET_NO;
3133   socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3134   va_start (vargs, open_cb_cls); /* Parse variable args */
3135   do {
3136     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3137     switch (option)
3138     {
3139     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3140       /* Expect struct GNUNET_TIME_Relative */
3141       socket->retransmit_timeout = va_arg (vargs,
3142                                            struct GNUNET_TIME_Relative);
3143       break;
3144     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3145       socket->testing_active = GNUNET_YES;
3146       socket->testing_set_write_sequence_number_value = va_arg (vargs,
3147                                                                 uint32_t);
3148       break;
3149     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3150       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3151       break;
3152     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3153       GNUNET_break (0);          /* Option irrelevant in STREAM_open */
3154       break;
3155     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3156       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3157       GNUNET_assert (0 != payload_size);
3158       if (payload_size < socket->max_payload_size)
3159         socket->max_payload_size = payload_size;
3160       break;
3161     case GNUNET_STREAM_OPTION_END:
3162       break;
3163     }
3164   } while (GNUNET_STREAM_OPTION_END != option);
3165   va_end (vargs);               /* End of variable args parsing */
3166   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3167                                       socket, /* cls */
3168                                       NULL, /* No inbound tunnel handler */
3169                                       NULL, /* No in-tunnel cleaner */
3170                                       client_message_handlers,
3171                                       NULL); /* We don't get inbound tunnels */
3172   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
3173   {
3174     GNUNET_free (socket);
3175     return NULL;
3176   }
3177   /* Now create the mesh tunnel to target */
3178   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3179   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3180                                               NULL, /* Tunnel context */
3181                                               &mesh_peer_connect_callback,
3182                                               &mesh_peer_disconnect_callback,
3183                                               socket);
3184   GNUNET_assert (NULL != socket->tunnel);
3185   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
3186                                         &socket->other_peer);
3187   socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3188   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3189   return socket;
3190 }
3191
3192
3193 /**
3194  * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3195  *
3196  * @param socket the stream socket
3197  * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3198  * @param completion_cb the callback that will be called upon successful
3199  *          shutdown of given operation
3200  * @param completion_cls the closure for the completion callback
3201  * @return the shutdown handle
3202  */
3203 struct GNUNET_STREAM_ShutdownHandle *
3204 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3205                         int operation,
3206                         GNUNET_STREAM_ShutdownCompletion completion_cb,
3207                         void *completion_cls)
3208 {
3209   struct GNUNET_STREAM_ShutdownHandle *handle;
3210   struct GNUNET_MessageHeader *msg;
3211   
3212   GNUNET_assert (NULL == socket->shutdown_handle);
3213   handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3214   handle->socket = socket;
3215   handle->completion_cb = completion_cb;
3216   handle->completion_cls = completion_cls;
3217   socket->shutdown_handle = handle;
3218   if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3219        || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3220        || ((GNUNET_YES == socket->transmit_closed) 
3221            && (GNUNET_YES == socket->receive_closed)
3222            && (SHUT_RDWR == operation)) )
3223   {
3224     handle->operation = operation;
3225     handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3226                                                           socket);
3227     return handle;
3228   }
3229   msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
3230   msg->size = htons (sizeof (struct GNUNET_MessageHeader));
3231   switch (operation)
3232   {
3233   case SHUT_RD:
3234     handle->operation = SHUT_RD;
3235     if (NULL != socket->read_handle)
3236       LOG (GNUNET_ERROR_TYPE_WARNING,
3237            "Existing read handle should be cancelled before shutting"
3238            " down reading\n");
3239     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3240     queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3241                    GNUNET_NO);
3242     socket->receive_closed = GNUNET_YES;
3243     break;
3244   case SHUT_WR:
3245     handle->operation = SHUT_WR;
3246     if (NULL != socket->write_handle)
3247       LOG (GNUNET_ERROR_TYPE_WARNING,
3248            "Existing write handle should be cancelled before shutting"
3249            " down writing\n");
3250     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3251     queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3252                    GNUNET_NO);
3253     socket->transmit_closed = GNUNET_YES;
3254     break;
3255   case SHUT_RDWR:
3256     handle->operation = SHUT_RDWR;
3257     if (NULL != socket->write_handle)
3258       LOG (GNUNET_ERROR_TYPE_WARNING,
3259            "Existing write handle should be cancelled before shutting"
3260            " down writing\n");
3261     if (NULL != socket->read_handle)
3262       LOG (GNUNET_ERROR_TYPE_WARNING,
3263            "Existing read handle should be cancelled before shutting"
3264            " down reading\n");
3265     msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3266     queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3267     socket->transmit_closed = GNUNET_YES;
3268     socket->receive_closed = GNUNET_YES;
3269     break;
3270   default:
3271     LOG (GNUNET_ERROR_TYPE_WARNING,
3272          "GNUNET_STREAM_shutdown called with invalid value for "
3273          "parameter operation -- Ignoring\n");
3274     GNUNET_free (msg);
3275     GNUNET_free (handle);
3276     return NULL;
3277   }
3278   handle->close_msg_retransmission_task_id =
3279     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3280                                   &close_msg_retransmission_task,
3281                                   handle);
3282   return handle;
3283 }
3284
3285
3286 /**
3287  * Cancels a pending shutdown. Note that the shutdown messages may already be
3288  * sent and the stream is shutdown already for the operation given to
3289  * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3290  * shutdown messages and frees the shutdown handle.
3291  *
3292  * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3293  */
3294 void
3295 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3296 {
3297   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3298     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3299   if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3300     GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3301   handle->socket->shutdown_handle = NULL;
3302   GNUNET_free (handle);
3303 }
3304
3305
3306 /**
3307  * Closes the stream
3308  *
3309  * @param socket the stream socket
3310  */
3311 void
3312 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3313 {
3314   struct MessageQueue *head;
3315
3316   if (NULL != socket->read_handle)
3317   {
3318     LOG (GNUNET_ERROR_TYPE_WARNING,
3319          "Closing STREAM socket when a read handle is pending\n");
3320     GNUNET_STREAM_read_cancel (socket->read_handle);
3321   }
3322   if (NULL != socket->write_handle)
3323   {
3324     LOG (GNUNET_ERROR_TYPE_WARNING,
3325          "Closing STREAM socket when a write handle is pending\n");
3326     GNUNET_STREAM_write_cancel (socket->write_handle);
3327     //socket->write_handle = NULL;
3328   }
3329   /* Terminate the ack'ing task if they are still present */
3330   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3331   {
3332     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3333     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3334   }
3335   /* Terminate the control retransmission tasks */
3336   if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3337   {
3338     GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3339   }
3340   /* Clear Transmit handles */
3341   if (NULL != socket->transmit_handle)
3342   {
3343     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3344     socket->transmit_handle = NULL;
3345   }
3346   /* Clear existing message queue */
3347   while (NULL != (head = socket->queue_head)) {
3348     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3349                                  socket->queue_tail,
3350                                  head);
3351     GNUNET_free (head->message);
3352     GNUNET_free (head);
3353   }
3354   /* Close associated tunnel */
3355   if (NULL != socket->tunnel)
3356   {
3357     GNUNET_MESH_tunnel_destroy (socket->tunnel);
3358     socket->tunnel = NULL;
3359   }
3360   /* Close mesh connection */
3361   if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3362   {
3363     GNUNET_MESH_disconnect (socket->mesh);
3364     socket->mesh = NULL;
3365   }
3366   /* Close statistics connection */
3367   if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3368     GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3369   /* Release receive buffer */
3370   if (NULL != socket->receive_buffer)
3371   {
3372     GNUNET_free (socket->receive_buffer);
3373   }
3374   GNUNET_free (socket);
3375 }
3376
3377
3378 /**
3379  * Listens for stream connections for a specific application ports
3380  *
3381  * @param cfg the configuration to use
3382  * @param app_port the application port for which new streams will be accepted
3383  * @param listen_cb this function will be called when a peer tries to establish
3384  *            a stream with us
3385  * @param listen_cb_cls closure for listen_cb
3386  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3387  * @return listen socket, NULL for any error
3388  */
3389 struct GNUNET_STREAM_ListenSocket *
3390 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3391                       GNUNET_MESH_ApplicationType app_port,
3392                       GNUNET_STREAM_ListenCallback listen_cb,
3393                       void *listen_cb_cls,
3394                       ...)
3395 {
3396   struct GNUNET_STREAM_ListenSocket *lsocket;
3397   struct GNUNET_TIME_Relative listen_timeout;
3398   enum GNUNET_STREAM_Option option;
3399   va_list vargs;
3400   uint16_t payload_size;
3401
3402   GNUNET_assert (NULL != listen_cb);
3403   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3404   lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3405   lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3406   if (NULL == lsocket->lockmanager)
3407   {
3408     GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3409     GNUNET_free (lsocket);
3410     return NULL;
3411   }
3412   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
3413   /* Set defaults */
3414   lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3415   lsocket->testing_active = GNUNET_NO;
3416   lsocket->listen_ok_cb = NULL;
3417   lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3418   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
3419   va_start (vargs, listen_cb_cls);
3420   do {
3421     option = va_arg (vargs, enum GNUNET_STREAM_Option);
3422     switch (option)
3423     {
3424     case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3425       lsocket->retransmit_timeout = va_arg (vargs,
3426                                             struct GNUNET_TIME_Relative);
3427       break;
3428     case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3429       lsocket->testing_active = GNUNET_YES;
3430       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3431                                                                  uint32_t);
3432       break;
3433     case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3434       listen_timeout = GNUNET_TIME_relative_multiply
3435         (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3436       break;
3437     case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3438       lsocket->listen_ok_cb = va_arg (vargs,
3439                                       GNUNET_STREAM_ListenSuccessCallback);
3440       break;
3441     case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3442       payload_size = (uint16_t) va_arg (vargs, unsigned int);
3443       GNUNET_assert (0 != payload_size);
3444       if (payload_size < lsocket->max_payload_size)
3445         lsocket->max_payload_size = payload_size;
3446       break;
3447     case GNUNET_STREAM_OPTION_END:
3448       break;
3449     }
3450   } while (GNUNET_STREAM_OPTION_END != option);
3451   va_end (vargs);
3452   lsocket->port = app_port;
3453   lsocket->listen_cb = listen_cb;
3454   lsocket->listen_cb_cls = listen_cb_cls;
3455   lsocket->locking_request = 
3456     GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3457                                      (uint32_t) lsocket->port,
3458                                      &lock_status_change_cb, lsocket);
3459   lsocket->lockmanager_acquire_timeout_task =
3460     GNUNET_SCHEDULER_add_delayed (listen_timeout,
3461                                   &lockmanager_acquire_timeout, lsocket);
3462   lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3463                                                    lsocket->cfg);
3464   return lsocket;
3465 }
3466
3467
3468 /**
3469  * Closes the listen socket
3470  *
3471  * @param lsocket the listen socket
3472  */
3473 void
3474 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3475 {
3476   /* Close MESH connection */
3477   if (NULL != lsocket->mesh)
3478     GNUNET_MESH_disconnect (lsocket->mesh);
3479   if (NULL != lsocket->stat_handle)
3480     GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3481   GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3482   if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3483     GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3484   if (NULL != lsocket->locking_request)
3485     GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3486   if (NULL != lsocket->lockmanager)
3487     GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3488   GNUNET_free (lsocket);
3489 }
3490
3491
3492 /**
3493  * Tries to write the given data to the stream. The maximum size of data that
3494  * can be written per a write operation is ~ 4MB (64 * (64000 - sizeof (struct
3495  * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3496  * violation, however only the said number of maximum bytes will be written.
3497  *
3498  * @param socket the socket representing a stream
3499  * @param data the data buffer from where the data is written into the stream
3500  * @param size the number of bytes to be written from the data buffer
3501  * @param timeout the timeout period
3502  * @param write_cont the function to call upon writing some bytes into the
3503  *          stream 
3504  * @param write_cont_cls the closure
3505  *
3506  * @return handle to cancel the operation; if a previous write is pending NULL
3507  *           is returned. If the stream has been shutdown for this operation or
3508  *           is broken then write_cont is immediately called and NULL is
3509  *           returned.
3510  */
3511 struct GNUNET_STREAM_WriteHandle *
3512 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3513                      const void *data,
3514                      size_t size,
3515                      struct GNUNET_TIME_Relative timeout,
3516                      GNUNET_STREAM_CompletionContinuation write_cont,
3517                      void *write_cont_cls)
3518 {
3519   struct GNUNET_STREAM_WriteHandle *io_handle;
3520   struct GNUNET_STREAM_DataMessage *dmsg;
3521   const void *sweep;
3522   struct GNUNET_TIME_Relative ack_deadline;
3523   unsigned int num_needed_packets;
3524   unsigned int cnt;
3525   uint32_t packet_size;
3526   uint32_t payload_size;
3527   uint16_t max_data_packet_size;
3528
3529   LOG (GNUNET_ERROR_TYPE_DEBUG,
3530        "%s\n", __func__);
3531   if (NULL != socket->write_handle)
3532   {
3533     GNUNET_break (0);
3534     return NULL;
3535   }
3536   if (NULL == socket->tunnel)
3537   {
3538     if (NULL != write_cont)
3539       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3540     return NULL;
3541   }
3542   switch (socket->state)
3543   {
3544   case STATE_TRANSMIT_CLOSED:
3545   case STATE_TRANSMIT_CLOSE_WAIT:
3546   case STATE_CLOSED:
3547   case STATE_CLOSE_WAIT:
3548     if (NULL != write_cont)
3549       write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3550     LOG (GNUNET_ERROR_TYPE_DEBUG,
3551          "%s() END\n", __func__);
3552     return NULL;
3553   case STATE_INIT:
3554   case STATE_LISTEN:
3555   case STATE_HELLO_WAIT:
3556     if (NULL != write_cont)
3557       write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3558     LOG (GNUNET_ERROR_TYPE_DEBUG,
3559          "%s() END\n", __func__);
3560     return NULL;
3561   case STATE_ESTABLISHED:
3562   case STATE_RECEIVE_CLOSED:
3563   case STATE_RECEIVE_CLOSE_WAIT:
3564     break;
3565   }
3566   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3567     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
3568   num_needed_packets =
3569       (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3570   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_WriteHandle));
3571   io_handle->socket = socket;
3572   io_handle->write_cont = write_cont;
3573   io_handle->write_cont_cls = write_cont_cls;
3574   io_handle->size = size;
3575   io_handle->packets_sent = 0;
3576   sweep = data;
3577   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3578      determined from RTT */
3579   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3580   /* Divide the given buffer into packets for sending */
3581   max_data_packet_size =
3582       socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3583   io_handle->max_ack_base_num = socket->write_sequence_number;
3584   for (cnt=0; cnt < num_needed_packets; cnt++)
3585   {
3586     if ((cnt + 1) * socket->max_payload_size < size) 
3587     {
3588       payload_size = socket->max_payload_size;
3589       packet_size = max_data_packet_size;
3590     }
3591     else 
3592     {
3593       payload_size = size - (cnt * socket->max_payload_size);
3594       packet_size = payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3595     }
3596     dmsg = GNUNET_malloc (packet_size);    
3597     dmsg->header.size = htons (packet_size);
3598     dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3599     dmsg->sequence_number = htonl (socket->write_sequence_number++);
3600     dmsg->offset = htonl (socket->write_offset);
3601     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3602        determined from RTT */
3603     dmsg->ack_deadline = GNUNET_TIME_relative_hton (ack_deadline);
3604     /* Copy data from given buffer to the packet */
3605     memcpy (&dmsg[1], sweep, payload_size);
3606     io_handle->messages[cnt] = dmsg;
3607     sweep += payload_size;
3608     socket->write_offset += payload_size;
3609   }
3610   /* ack the last data message. FIXME: remove when we figure out how to do this
3611      using RTT */
3612   io_handle->messages[num_needed_packets - 1]->ack_deadline = 
3613       GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3614   socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
3615   socket->write_handle = io_handle;
3616   write_data (socket);
3617   LOG (GNUNET_ERROR_TYPE_DEBUG,
3618        "%s() END\n", __func__);
3619   return io_handle;
3620 }
3621
3622
3623 /**
3624  * Function to check the ACK bitmap for any received messages and call the data processor
3625  *
3626  * @param cls the socket
3627  * @param tc the scheduler task context
3628  */
3629 static void
3630 probe_data_availability (void *cls,
3631                          const struct GNUNET_SCHEDULER_TaskContext *tc)
3632 {
3633   struct GNUNET_STREAM_Socket *socket = cls;
3634
3635   GNUNET_assert (NULL != socket->read_handle);
3636   socket->read_handle->probe_data_availability_task_id =
3637       GNUNET_SCHEDULER_NO_TASK;
3638   if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id)
3639     return;     /* A task to call read processor is present */
3640   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3641                                           0))
3642     socket->read_handle->read_task_id 
3643         = GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
3644 }
3645
3646
3647 /**
3648  * Tries to read data from the stream. Should not be called when another read
3649  * handle is present; the existing read handle should be canceled with
3650  * GNUNET_STREAM_read_cancel(). Only one read handle per socket is present at
3651  * any time
3652  *
3653  * @param socket the socket representing a stream
3654  * @param timeout the timeout period
3655  * @param proc function to call with data (once only)
3656  * @param proc_cls the closure for proc
3657  * @return handle to cancel the operation; NULL is returned if the stream has
3658  *           been shutdown for this type of opeartion (the DataProcessor is
3659  *           immediately called with GNUNET_STREAM_SHUTDOWN as status)
3660  */
3661 struct GNUNET_STREAM_ReadHandle *
3662 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3663                     struct GNUNET_TIME_Relative timeout,
3664                     GNUNET_STREAM_DataProcessor proc,
3665                     void *proc_cls)
3666 {
3667   struct GNUNET_STREAM_ReadHandle *read_handle;
3668   
3669   LOG (GNUNET_ERROR_TYPE_DEBUG,
3670        "%s: %s()\n", 
3671        GNUNET_i2s (&socket->other_peer),
3672        __func__);
3673   /* Only one read handle is permitted at any time; cancel the existing or wait
3674      for it to complete */
3675   GNUNET_assert (NULL == socket->read_handle);
3676   GNUNET_assert (NULL != proc);
3677   if (GNUNET_YES == socket->receive_closed)
3678     return NULL;
3679   switch (socket->state)
3680   {
3681   case STATE_RECEIVE_CLOSED:
3682   case STATE_RECEIVE_CLOSE_WAIT:
3683   case STATE_CLOSED:
3684   case STATE_CLOSE_WAIT:
3685     LOG (GNUNET_ERROR_TYPE_DEBUG,
3686          "%s: %s() END\n",
3687          GNUNET_i2s (&socket->other_peer),
3688          __func__);
3689     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3690     return NULL;
3691   default:
3692     break;
3693   }
3694   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ReadHandle));
3695   read_handle->proc = proc;
3696   read_handle->proc_cls = proc_cls;
3697   read_handle->socket = socket;
3698   socket->read_handle = read_handle;
3699   read_handle->probe_data_availability_task_id =
3700       GNUNET_SCHEDULER_add_now (&probe_data_availability, socket);
3701   read_handle->read_io_timeout_task_id =
3702       GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3703   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3704        GNUNET_i2s (&socket->other_peer), __func__);
3705   return read_handle;
3706 }
3707
3708
3709 /**
3710  * Cancels pending write operation. Also cancels packet retransmissions which
3711  * may have resulted otherwise.
3712  *
3713  * CAUTION: Normally a write operation is considered successful if the data
3714  * given to it is sent and acknowledged by the receiver. As data is divided
3715  * into packets, it is possible that not all packets are received by the
3716  * receiver. Any missing packets are then retransmitted till the receiver
3717  * acknowledges all packets or until a timeout . During this scenario if the
3718  * write operation is cancelled all such retransmissions are also
3719  * cancelled. This may leave the receiver's receive buffer incompletely filled
3720  * as some missing packets are never retransmitted. So this operation should be
3721  * used before shutting down transmission from our side or before closing the
3722  * socket.
3723  *
3724  * @param wh write operation handle to cancel
3725  */
3726 void
3727 GNUNET_STREAM_write_cancel (struct GNUNET_STREAM_WriteHandle *wh)
3728 {
3729   struct GNUNET_STREAM_Socket *socket = wh->socket;
3730   unsigned int packet;
3731
3732   GNUNET_assert (NULL != socket->write_handle);
3733   GNUNET_assert (socket->write_handle == wh);
3734   if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3735   {
3736     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3737     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3738   }
3739   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3740   {
3741     if (NULL == wh->messages[packet]) break;
3742     GNUNET_free (wh->messages[packet]);
3743   }      
3744   GNUNET_free (socket->write_handle);
3745   socket->write_handle = NULL;
3746 }
3747
3748
3749 /**
3750  * Cancel pending read operation.
3751  *
3752  * @param rh read operation handle to cancel
3753  */
3754 void
3755 GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
3756 {
3757   struct GNUNET_STREAM_Socket *socket;
3758   
3759   socket = rh->socket;
3760   GNUNET_assert (NULL != socket->read_handle);
3761   GNUNET_assert (rh == socket->read_handle);
3762   cleanup_read_handle (socket);
3763 }
3764
3765
3766 /**
3767  * Functions of this signature are called whenever writing operations
3768  * on a stream are executed
3769  *
3770  * @param cls the closure from GNUNET_STREAM_write
3771  * @param status the status of the stream at the time this function is called;
3772  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
3773  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
3774  *          (this doesn't mean that the data is never sent, the receiver may
3775  *          have read the data but its ACKs may have been lost);
3776  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
3777  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
3778  *          be processed.
3779  * @param size the number of bytes written
3780  */
3781 static void 
3782 mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
3783 {
3784   struct GNUNET_MQ_MessageQueue *mq = cls;
3785   struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3786   struct GNUNET_MQ_Message *mqm;
3787
3788   switch (status)
3789   {
3790     case GNUNET_STREAM_OK:
3791       break;
3792     case GNUNET_STREAM_SHUTDOWN:
3793       /* FIXME: call shutdown handler */
3794       return;
3795     case GNUNET_STREAM_TIMEOUT:
3796       if (NULL == mq->error_handler)
3797         LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n");
3798       else
3799         mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3800       return;
3801     case GNUNET_STREAM_SYSERR:
3802       if (NULL == mq->error_handler)
3803         LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n");
3804       else
3805         mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
3806       return;
3807     default:
3808       GNUNET_assert (0);
3809       return;
3810   }
3811   
3812   /* call cb for message we finished sending */
3813   mqm = mq->current_msg;
3814   GNUNET_assert (NULL != mq->current_msg);
3815   if (NULL != mqm->sent_cb)
3816     mqm->sent_cb (mqm->sent_cls);
3817   GNUNET_free (mqm);
3818
3819   mss->wh = NULL;
3820
3821   mqm = mq->msg_head;
3822   mq->current_msg = mqm;
3823   if (NULL == mqm)
3824     return;
3825   GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
3826   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3827                                  GNUNET_TIME_UNIT_FOREVER_REL,
3828                                  mq_stream_write_queued, mq);
3829   GNUNET_assert (NULL != mss->wh);
3830 }
3831
3832
3833 static void
3834 mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
3835                      struct GNUNET_MQ_Message *mqm)
3836 {
3837   struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3838
3839   if (NULL != mq->current_msg)
3840   {
3841     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
3842     return;
3843   }
3844   mq->current_msg = mqm;
3845   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3846                                  GNUNET_TIME_UNIT_FOREVER_REL,
3847                                  mq_stream_write_queued, mq);
3848 }
3849
3850
3851 /**
3852  * Functions with this signature are called whenever a
3853  * complete message is received by the tokenizer.
3854  *
3855  * Do not call GNUNET_SERVER_mst_destroy in callback
3856  *
3857  * @param cls closure
3858  * @param client identification of the client
3859  * @param message the actual message
3860  *
3861  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
3862  */
3863 static int
3864 mq_stream_mst_callback (void *cls, void *client,
3865                      const struct GNUNET_MessageHeader *message)
3866 {
3867   struct GNUNET_MQ_MessageQueue *mq = cls;
3868
3869   GNUNET_assert (NULL != message);
3870   GNUNET_MQ_dispatch (mq, message);
3871   return GNUNET_OK;
3872 }
3873
3874
3875 /**
3876  * Functions of this signature are called whenever data is available from the
3877  * stream.
3878  *
3879  * @param cls the closure from GNUNET_STREAM_read
3880  * @param status the status of the stream at the time this function is called
3881  * @param data traffic from the other side
3882  * @param size the number of bytes available in data read; will be 0 on timeout 
3883  * @return number of bytes of processed from 'data' (any data remaining should be
3884  *         given to the next time the read processor is called).
3885  */
3886 static size_t
3887 mq_stream_data_processor (void *cls,
3888                           enum GNUNET_STREAM_Status status,
3889                           const void *data,
3890                           size_t size)
3891 {
3892   struct GNUNET_MQ_MessageQueue *mq = cls;
3893   struct MQStreamState *mss;
3894   int ret;
3895
3896   switch (status)
3897   {
3898     case GNUNET_STREAM_OK:
3899       break;
3900     case GNUNET_STREAM_SHUTDOWN:
3901       /* FIXME: call shutdown handler */
3902       return 0;
3903     case GNUNET_STREAM_TIMEOUT:
3904       if (NULL == mq->error_handler)
3905         LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n");
3906       else
3907         mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3908       return 0;
3909     case GNUNET_STREAM_SYSERR:
3910       if (NULL == mq->error_handler)
3911         LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n");
3912       else
3913         mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3914       return 0;
3915     default:
3916       GNUNET_assert (0);
3917       return 0;
3918   }
3919   
3920   mss = (struct MQStreamState *) mq->impl_state;
3921   GNUNET_assert (GNUNET_STREAM_OK == status);
3922   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
3923   if (GNUNET_OK != ret)
3924   {
3925     if (NULL == mq->error_handler)
3926       LOG (GNUNET_ERROR_TYPE_WARNING,
3927            "read error (message stream malformed), but no error handler installed for message queue\n");
3928     else
3929       mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3930     return 0;
3931   }
3932   /* we always read all data */
3933   mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
3934                                 mq_stream_data_processor, mq);
3935   return size;
3936 }
3937
3938
3939 static void
3940 mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
3941 {
3942   struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3943
3944   if (NULL != mss->rh)
3945   {
3946     GNUNET_STREAM_read_cancel (mss->rh);
3947     mss->rh = NULL;
3948   }
3949
3950   if (NULL != mss->wh)
3951   {
3952     GNUNET_STREAM_write_cancel (mss->wh);
3953     mss->wh = NULL;
3954   }
3955
3956   if (NULL != mss->mst)
3957   {
3958     GNUNET_SERVER_mst_destroy (mss->mst);
3959     mss->mst = NULL;
3960   }
3961
3962   GNUNET_free (mss);
3963 }
3964
3965
3966
3967 /**
3968  * Create a message queue for a stream socket.
3969  *
3970  * @param socket the socket to read/write in the message queue
3971  * @param msg_handlers message handler array
3972  * @param error_handler callback for errors
3973  * @return the message queue for the socket
3974  */
3975 struct GNUNET_MQ_MessageQueue *
3976 GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, 
3977                          const struct GNUNET_MQ_Handler *msg_handlers,
3978                          GNUNET_MQ_ErrorHandler error_handler,
3979                          void *cls)
3980 {
3981   struct GNUNET_MQ_MessageQueue *mq;
3982   struct MQStreamState *mss;
3983
3984   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
3985   mss = GNUNET_new (struct MQStreamState);
3986   mss->socket = socket;
3987   mq->impl_state = mss;
3988   mq->send_impl = mq_stream_send_impl;
3989   mq->destroy_impl = mq_stream_destroy_impl;
3990   mq->handlers = msg_handlers;
3991   mq->handlers_cls = cls;
3992   mq->error_handler = error_handler;
3993   if (NULL != msg_handlers)
3994   {
3995     mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
3996     mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 
3997                                   mq_stream_data_processor, mq);
3998   }
3999   return mq;
4000 }
4001
4002 /* end of stream_api.c */