2b9363c68be1955806162e352da199d135601431
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * The listen socket from which this socket is derived. Should be NULL if it
235    * is not a derived socket
236    */
237   struct GNUNET_STREAM_ListenSocket *lsocket;
238
239   /**
240    * Task identifier for the read io timeout task
241    */
242   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
243
244   /**
245    * Task identifier for retransmission task after timeout
246    */
247   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
248
249   /**
250    * The task for sending timely Acks
251    */
252   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
253
254   /**
255    * Task scheduled to continue a read operation.
256    */
257   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
258
259   /**
260    * The state of the protocol associated with this socket
261    */
262   enum State state;
263
264   /**
265    * The status of the socket
266    */
267   enum GNUNET_STREAM_Status status;
268
269   /**
270    * The number of previous timeouts; FIXME: currently not used
271    */
272   unsigned int retries;
273
274   /**
275    * The peer identity of the peer at the other end of the stream
276    */
277   GNUNET_PEER_Id other_peer;
278
279   /**
280    * Our Peer Identity (for debugging)
281    */
282   GNUNET_PEER_Id our_id;
283
284   /**
285    * The application port number (type: uint32_t)
286    */
287   GNUNET_MESH_ApplicationType app_port;
288
289   /**
290    * The session id associated with this stream connection
291    * FIXME: Not used currently, may be removed
292    */
293   uint32_t session_id;
294
295   /**
296    * Write sequence number. Set to random when sending HELLO(client) and
297    * HELLO_ACK(server) 
298    */
299   uint32_t write_sequence_number;
300
301   /**
302    * Read sequence number. This number's value is determined during handshake
303    */
304   uint32_t read_sequence_number;
305
306   /**
307    * The receiver buffer size
308    */
309   uint32_t receive_buffer_size;
310
311   /**
312    * The receiver buffer boundaries
313    */
314   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
315
316   /**
317    * receiver's available buffer after the last acknowledged packet
318    */
319   uint32_t receiver_window_available;
320
321   /**
322    * The offset pointer used during write operation
323    */
324   uint32_t write_offset;
325
326   /**
327    * The offset after which we are expecting data
328    */
329   uint32_t read_offset;
330
331   /**
332    * The offset upto which user has read from the received buffer
333    */
334   uint32_t copy_offset;
335 };
336
337
338 /**
339  * A socket for listening
340  */
341 struct GNUNET_STREAM_ListenSocket
342 {
343   /**
344    * The mesh handle
345    */
346   struct GNUNET_MESH_Handle *mesh;
347
348   /**
349    * The callback function which is called after successful opening socket
350    */
351   GNUNET_STREAM_ListenCallback listen_cb;
352
353   /**
354    * The call back closure
355    */
356   void *listen_cb_cls;
357
358   /**
359    * Our interned Peer's identity
360    */
361   GNUNET_PEER_Id our_id;
362
363   /**
364    * The service port
365    * FIXME: Remove if not required!
366    */
367   GNUNET_MESH_ApplicationType port;
368 };
369
370
371 /**
372  * The IO Write Handle
373  */
374 struct GNUNET_STREAM_IOWriteHandle
375 {
376   /**
377    * The packet_buffers associated with this Handle
378    */
379   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
380
381   /**
382    * The write continuation callback
383    */
384   GNUNET_STREAM_CompletionContinuation write_cont;
385
386   /**
387    * Write continuation closure
388    */
389   void *write_cont_cls;
390
391   /**
392    * The bitmap of this IOHandle; Corresponding bit for a message is set when
393    * it has been acknowledged by the receiver
394    */
395   GNUNET_STREAM_AckBitmap ack_bitmap;
396
397   /**
398    * Number of bytes in this write handle
399    */
400   size_t size;
401
402   /**
403    * Number of packets sent before waiting for an ack
404    *
405    * FIXME: Do we need this?
406    */
407   unsigned int sent_packets;
408 };
409
410
411 /**
412  * The IO Read Handle
413  */
414 struct GNUNET_STREAM_IOReadHandle
415 {
416   /**
417    * Callback for the read processor
418    */
419   GNUNET_STREAM_DataProcessor proc;
420
421   /**
422    * The closure pointer for the read processor callback
423    */
424   void *proc_cls;
425 };
426
427
428 /**
429  * Default value in seconds for various timeouts
430  */
431 static unsigned int default_timeout = 10;
432
433 /**
434  * Callback function for sending hello message
435  *
436  * @param cls closure the socket
437  * @param size number of bytes available in buf
438  * @param buf where the callee should write the message
439  * @return number of bytes written to buf
440  */
441 static size_t
442 send_message_notify (void *cls, size_t size, void *buf)
443 {
444   struct GNUNET_STREAM_Socket *socket = cls;
445   struct GNUNET_PeerIdentity target;
446   struct MessageQueue *head;
447   size_t ret;
448
449   socket->transmit_handle = NULL; /* Remove the transmit handle */
450   head = socket->queue_head;
451   if (NULL == head)
452     return 0; /* just to be safe */
453   GNUNET_PEER_resolve (socket->other_peer, &target);
454   if (0 == size)                /* request timed out */
455     {
456       socket->retries++;
457       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458                   "Message sending timed out. Retry %d \n",
459                   socket->retries);
460       socket->transmit_handle = 
461         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
462                                            0, /* Corking */
463                                            1, /* Priority */
464                                            /* FIXME: exponential backoff */
465                                            socket->retransmit_timeout,
466                                            &target,
467                                            ntohs (head->message->header.size),
468                                            &send_message_notify,
469                                            socket);
470       return 0;
471     }
472
473   ret = ntohs (head->message->header.size);
474   GNUNET_assert (size >= ret);
475   memcpy (buf, head->message, ret);
476   if (NULL != head->finish_cb)
477     {
478       head->finish_cb (head->finish_cb_cls, socket);
479     }
480   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
481                                socket->queue_tail,
482                                head);
483   GNUNET_free (head->message);
484   GNUNET_free (head);
485   head = socket->queue_head;
486   if (NULL != head)    /* more pending messages to send */
487     {
488       socket->retries = 0;
489       socket->transmit_handle = 
490         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
491                                            0, /* Corking */
492                                            1, /* Priority */
493                                            /* FIXME: exponential backoff */
494                                            socket->retransmit_timeout,
495                                            &target,
496                                            ntohs (head->message->header.size),
497                                            &send_message_notify,
498                                            socket);
499     }
500   return ret;
501 }
502
503
504 /**
505  * Queues a message for sending using the mesh connection of a socket
506  *
507  * @param socket the socket whose mesh connection is used
508  * @param message the message to be sent
509  * @param finish_cb the callback to be called when the message is sent
510  * @param finish_cb_cls the closure for the callback
511  */
512 static void
513 queue_message (struct GNUNET_STREAM_Socket *socket,
514                struct GNUNET_STREAM_MessageHeader *message,
515                SendFinishCallback finish_cb,
516                void *finish_cb_cls)
517 {
518   struct MessageQueue *queue_entity;
519   struct GNUNET_PeerIdentity target;
520
521   GNUNET_assert 
522     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
523      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
524
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526               "%x: Queueing message of type %d and size %d\n",
527               socket->our_id,
528               ntohs (message->header.type),
529               ntohs (message->header.size));
530   GNUNET_assert (NULL != message);
531   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
532   queue_entity->message = message;
533   queue_entity->finish_cb = finish_cb;
534   queue_entity->finish_cb_cls = finish_cb_cls;
535   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
536                                     socket->queue_tail,
537                                     queue_entity);
538   if (NULL == socket->transmit_handle)
539   {
540     socket->retries = 0;
541     GNUNET_PEER_resolve (socket->other_peer, &target);
542     socket->transmit_handle = 
543       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
544                                          0, /* Corking */
545                                          1, /* Priority */
546                                          socket->retransmit_timeout,
547                                          &target,
548                                          ntohs (message->header.size),
549                                          &send_message_notify,
550                                          socket);
551   }
552 }
553
554
555 /**
556  * Copies a message and queues it for sending using the mesh connection of
557  * given socket 
558  *
559  * @param socket the socket whose mesh connection is used
560  * @param message the message to be sent
561  * @param finish_cb the callback to be called when the message is sent
562  * @param finish_cb_cls the closure for the callback
563  */
564 static void
565 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
566                         const struct GNUNET_STREAM_MessageHeader *message,
567                         SendFinishCallback finish_cb,
568                         void *finish_cb_cls)
569 {
570   struct GNUNET_STREAM_MessageHeader *msg_copy;
571   uint16_t size;
572   
573   size = ntohs (message->header.size);
574   msg_copy = GNUNET_malloc (size);
575   memcpy (msg_copy, message, size);
576   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
577 }
578
579
580 /**
581  * Callback function for sending ack message
582  *
583  * @param cls closure the ACK message created in ack_task
584  * @param size number of bytes available in buffer
585  * @param buf where the callee should write the message
586  * @return number of bytes written to buf
587  */
588 static size_t
589 send_ack_notify (void *cls, size_t size, void *buf)
590 {
591   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
592
593   if (0 == size)
594     {
595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596                   "%s called with size 0\n", __func__);
597       return 0;
598     }
599   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
600   
601   size = ntohs (ack_msg->header.header.size);
602   memcpy (buf, ack_msg, size);
603   return size;
604 }
605
606 /**
607  * Writes data using the given socket. The amount of data written is limited by
608  * the receiver_window_size
609  *
610  * @param socket the socket to use
611  */
612 static void 
613 write_data (struct GNUNET_STREAM_Socket *socket);
614
615 /**
616  * Task for retransmitting data messages if they aren't ACK before their ack
617  * deadline 
618  *
619  * @param cls the socket
620  * @param tc the Task context
621  */
622 static void
623 retransmission_timeout_task (void *cls,
624                              const struct GNUNET_SCHEDULER_TaskContext *tc)
625 {
626   struct GNUNET_STREAM_Socket *socket = cls;
627   
628   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
629     return;
630
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "%x: Retransmitting DATA...\n", socket->our_id);
633   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
634   write_data (socket);
635 }
636
637
638 /**
639  * Task for sending ACK message
640  *
641  * @param cls the socket
642  * @param tc the Task context
643  */
644 static void
645 ack_task (void *cls,
646           const struct GNUNET_SCHEDULER_TaskContext *tc)
647 {
648   struct GNUNET_STREAM_Socket *socket = cls;
649   struct GNUNET_STREAM_AckMessage *ack_msg;
650   struct GNUNET_PeerIdentity target;
651
652   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
653     {
654       return;
655     }
656
657   socket->ack_task_id = 0;
658
659   /* Create the ACK Message */
660   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
661   ack_msg->header.header.size = htons (sizeof (struct 
662                                                GNUNET_STREAM_AckMessage));
663   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
664   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
665   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
666   ack_msg->receive_window_remaining = 
667     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
668
669   GNUNET_PEER_resolve (socket->other_peer, &target);
670   /* Request MESH for sending ACK */
671   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
672                                      0, /* Corking */
673                                      1, /* Priority */
674                                      socket->retransmit_timeout,
675                                      &target,
676                                      ntohs (ack_msg->header.header.size),
677                                      &send_ack_notify,
678                                      ack_msg);
679
680   
681 }
682
683
684 /**
685  * Function to modify a bit in GNUNET_STREAM_AckBitmap
686  *
687  * @param bitmap the bitmap to modify
688  * @param bit the bit number to modify
689  * @param value GNUNET_YES to on, GNUNET_NO to off
690  */
691 static void
692 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
693                       unsigned int bit, 
694                       int value)
695 {
696   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
697   if (GNUNET_YES == value)
698     *bitmap |= (1LL << bit);
699   else
700     *bitmap &= ~(1LL << bit);
701 }
702
703
704 /**
705  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
706  *
707  * @param bitmap address of the bitmap that has to be checked
708  * @param bit the bit number to check
709  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
710  */
711 static uint8_t
712 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
713                       unsigned int bit)
714 {
715   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
716   return 0 != (*bitmap & (1LL << bit));
717 }
718
719
720
721 /**
722  * Function called when Data Message is sent
723  *
724  * @param cls the io_handle corresponding to the Data Message
725  * @param socket the socket which was used
726  */
727 static void
728 write_data_finish_cb (void *cls,
729                       struct GNUNET_STREAM_Socket *socket)
730 {
731   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
732
733   io_handle->sent_packets++;
734 }
735
736
737 /**
738  * Writes data using the given socket. The amount of data written is limited by
739  * the receiver_window_size
740  *
741  * @param socket the socket to use
742  */
743 static void 
744 write_data (struct GNUNET_STREAM_Socket *socket)
745 {
746   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
747   int packet;                   /* Although an int, should never be negative */
748   int ack_packet;
749
750   ack_packet = -1;
751   /* Find the last acknowledged packet */
752   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
753     {      
754       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
755                                               packet))
756         ack_packet = packet;        
757       else if (NULL == io_handle->messages[packet])
758         break;
759     }
760   /* Resend packets which weren't ack'ed */
761   for (packet=0; packet < ack_packet; packet++)
762     {
763       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
764                                              packet))
765         {
766           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767                       "%x: Placing DATA message with sequence %u in send queue\n",
768                       socket->our_id,
769                       ntohl (io_handle->messages[packet]->sequence_number));
770
771           copy_and_queue_message (socket,
772                                   &io_handle->messages[packet]->header,
773                                   NULL,
774                                   NULL);
775         }
776     }
777   packet = ack_packet + 1;
778   /* Now send new packets if there is enough buffer space */
779   while ( (NULL != io_handle->messages[packet]) &&
780           (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
781     {
782       socket->receiver_window_available -= 
783         ntohs (io_handle->messages[packet]->header.header.size);
784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785                   "%x: Placing DATA message with sequence %u in send queue\n",
786                   socket->our_id,
787                   ntohl (io_handle->messages[packet]->sequence_number));
788       copy_and_queue_message (socket,
789                               &io_handle->messages[packet]->header,
790                               &write_data_finish_cb,
791                               io_handle);
792       packet++;
793     }
794
795   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
796     socket->retransmission_timeout_task_id = 
797       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
798                                     (GNUNET_TIME_UNIT_SECONDS, 5),
799                                     &retransmission_timeout_task,
800                                     socket);
801 }
802
803
804 /**
805  * Task for calling the read processor
806  *
807  * @param cls the socket
808  * @param tc the task context
809  */
810 static void
811 call_read_processor (void *cls,
812                           const struct GNUNET_SCHEDULER_TaskContext *tc)
813 {
814   struct GNUNET_STREAM_Socket *socket = cls;
815   size_t read_size;
816   size_t valid_read_size;
817   unsigned int packet;
818   uint32_t sequence_increase;
819   uint32_t offset_increase;
820
821   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
822   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
823     return;
824
825   GNUNET_assert (NULL != socket->read_handle);
826   GNUNET_assert (NULL != socket->read_handle->proc);
827
828   /* Check the bitmap for any holes */
829   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
830     {
831       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
832                                              packet))
833         break;
834     }
835   /* We only call read processor if we have the first packet */
836   GNUNET_assert (0 < packet);
837
838   valid_read_size = 
839     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
840
841   GNUNET_assert (0 != valid_read_size);
842
843   /* Cancel the read_io_timeout_task */
844   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
845   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
846
847   /* Call the data processor */
848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849               "%x: Calling read processor\n",
850               socket->our_id);
851   read_size = 
852     socket->read_handle->proc (socket->read_handle->proc_cls,
853                                socket->status,
854                                socket->receive_buffer + socket->copy_offset,
855                                valid_read_size);
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "%x: Read processor read %d bytes\n",
858               socket->our_id,
859               read_size);
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "%x: Read processor completed successfully\n",
862               socket->our_id);
863
864   /* Free the read handle */
865   GNUNET_free (socket->read_handle);
866   socket->read_handle = NULL;
867
868   GNUNET_assert (read_size <= valid_read_size);
869   socket->copy_offset += read_size;
870
871   /* Determine upto which packet we can remove from the buffer */
872   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
873     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
874       break;
875
876   /* If no packets can be removed we can't move the buffer */
877   if (0 == packet) return;
878
879   sequence_increase = packet;
880
881   /* Shift the data in the receive buffer */
882   memmove (socket->receive_buffer,
883            socket->receive_buffer 
884            + socket->receive_buffer_boundaries[sequence_increase-1],
885            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
886   
887   /* Shift the bitmap */
888   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
889   
890   /* Set read_sequence_number */
891   socket->read_sequence_number += sequence_increase;
892   
893   /* Set read_offset */
894   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
895   socket->read_offset += offset_increase;
896
897   /* Fix copy_offset */
898   GNUNET_assert (offset_increase <= socket->copy_offset);
899   socket->copy_offset -= offset_increase;
900   
901   /* Fix relative boundaries */
902   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
903     {
904       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
905         {
906           socket->receive_buffer_boundaries[packet] = 
907             socket->receive_buffer_boundaries[packet + sequence_increase] 
908             - offset_increase;
909         }
910       else
911         socket->receive_buffer_boundaries[packet] = 0;
912     }
913 }
914
915
916 /**
917  * Cancels the existing read io handle
918  *
919  * @param cls the closure from the SCHEDULER call
920  * @param tc the task context
921  */
922 static void
923 read_io_timeout (void *cls, 
924                 const struct GNUNET_SCHEDULER_TaskContext *tc)
925 {
926   struct GNUNET_STREAM_Socket *socket = cls;
927   GNUNET_STREAM_DataProcessor proc;
928   void *proc_cls;
929
930   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
931   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
932   {
933     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934                 "%x: Read task timedout - Cancelling it\n",
935                 socket->our_id);
936     GNUNET_SCHEDULER_cancel (socket->read_task_id);
937     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
938   }
939   GNUNET_assert (NULL != socket->read_handle);
940   proc = socket->read_handle->proc;
941   proc_cls = socket->read_handle->proc_cls;
942
943   GNUNET_free (socket->read_handle);
944   socket->read_handle = NULL;
945   /* Call the read processor to signal timeout */
946   proc (proc_cls,
947         GNUNET_STREAM_TIMEOUT,
948         NULL,
949         0);
950 }
951
952
953 /**
954  * Handler for DATA messages; Same for both client and server
955  *
956  * @param socket the socket through which the ack was received
957  * @param tunnel connection to the other end
958  * @param sender who sent the message
959  * @param msg the data message
960  * @param atsi performance data for the connection
961  * @return GNUNET_OK to keep the connection open,
962  *         GNUNET_SYSERR to close it (signal serious error)
963  */
964 static int
965 handle_data (struct GNUNET_STREAM_Socket *socket,
966              struct GNUNET_MESH_Tunnel *tunnel,
967              const struct GNUNET_PeerIdentity *sender,
968              const struct GNUNET_STREAM_DataMessage *msg,
969              const struct GNUNET_ATS_Information*atsi)
970 {
971   const void *payload;
972   uint32_t bytes_needed;
973   uint32_t relative_offset;
974   uint32_t relative_sequence_number;
975   uint16_t size;
976
977   size = htons (msg->header.header.size);
978   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
979     {
980       GNUNET_break_op (0);
981       return GNUNET_SYSERR;
982     }
983
984   if (GNUNET_PEER_search (sender) != socket->other_peer)
985     {
986       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987                   "%x: Received DATA from non-confirming peer\n",
988                   socket->our_id);
989       return GNUNET_YES;
990     }
991
992   switch (socket->state)
993     {
994     case STATE_ESTABLISHED:
995     case STATE_TRANSMIT_CLOSED:
996     case STATE_TRANSMIT_CLOSE_WAIT:
997       
998       /* check if the message's sequence number is in the range we are
999          expecting */
1000       relative_sequence_number = 
1001         ntohl (msg->sequence_number) - socket->read_sequence_number;
1002       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1003         {
1004           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1005                       "%x: Ignoring received message with sequence number %u\n",
1006                       socket->our_id,
1007                       ntohl (msg->sequence_number));
1008           /* Start ACK sending task if one is not already present */
1009           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1010             {
1011               socket->ack_task_id = 
1012                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1013                                               (msg->ack_deadline),
1014                                               &ack_task,
1015                                               socket);
1016             }
1017           return GNUNET_YES;
1018         }
1019       
1020       /* Check if we have already seen this message */
1021       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1022                                               relative_sequence_number))
1023         {
1024           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025                       "%x: Ignoring already received message with sequence "
1026                       "number %u\n",
1027                       socket->our_id,
1028                       ntohl (msg->sequence_number));
1029           /* Start ACK sending task if one is not already present */
1030           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1031             {
1032               socket->ack_task_id = 
1033                 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1034                                               (msg->ack_deadline),
1035                                               &ack_task,
1036                                               socket);
1037             }
1038           return GNUNET_YES;
1039         }
1040
1041       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042                   "%x: Receiving DATA with sequence number: %u and size: %d "
1043                   "from %x\n",
1044                   socket->our_id,
1045                   ntohl (msg->sequence_number),
1046                   ntohs (msg->header.header.size),
1047                   socket->other_peer);
1048
1049       /* Check if we have to allocate the buffer */
1050       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1051       relative_offset = ntohl (msg->offset) - socket->read_offset;
1052       bytes_needed = relative_offset + size;
1053       if (bytes_needed > socket->receive_buffer_size)
1054         {
1055           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1056             {
1057               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1058                                                        bytes_needed);
1059               socket->receive_buffer_size = bytes_needed;
1060             }
1061           else
1062             {
1063               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                           "%x: Cannot accommodate packet %d as buffer is",
1065                           "full\n",
1066                           socket->our_id,
1067                           ntohl (msg->sequence_number));
1068               return GNUNET_YES;
1069             }
1070         }
1071       
1072       /* Copy Data to buffer */
1073       payload = &msg[1];
1074       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1075       memcpy (socket->receive_buffer + relative_offset,
1076               payload,
1077               size);
1078       socket->receive_buffer_boundaries[relative_sequence_number] = 
1079         relative_offset + size;
1080       
1081       /* Modify the ACK bitmap */
1082       ackbitmap_modify_bit (&socket->ack_bitmap,
1083                             relative_sequence_number,
1084                             GNUNET_YES);
1085
1086       /* Start ACK sending task if one is not already present */
1087       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1088        {
1089          socket->ack_task_id = 
1090            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1091                                          (msg->ack_deadline),
1092                                          &ack_task,
1093                                          socket);
1094        }
1095
1096       if ((NULL != socket->read_handle) /* A read handle is waiting */
1097           /* There is no current read task */
1098           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1099           /* We have the first packet */
1100           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1101                                                  0)))
1102         {
1103           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1104                       "%x: Scheduling read processor\n",
1105                       socket->our_id);
1106
1107           socket->read_task_id = 
1108             GNUNET_SCHEDULER_add_now (&call_read_processor,
1109                                       socket);
1110         }
1111       
1112       break;
1113
1114     default:
1115       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116                   "%x: Received data message when it cannot be handled\n",
1117                   socket->our_id);
1118       break;
1119     }
1120   return GNUNET_YES;
1121 }
1122
1123 /**
1124  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1125  *
1126  * @param cls the socket (set from GNUNET_MESH_connect)
1127  * @param tunnel connection to the other end
1128  * @param tunnel_ctx place to store local state associated with the tunnel
1129  * @param sender who sent the message
1130  * @param message the actual message
1131  * @param atsi performance data for the connection
1132  * @return GNUNET_OK to keep the connection open,
1133  *         GNUNET_SYSERR to close it (signal serious error)
1134  */
1135 static int
1136 client_handle_data (void *cls,
1137              struct GNUNET_MESH_Tunnel *tunnel,
1138              void **tunnel_ctx,
1139              const struct GNUNET_PeerIdentity *sender,
1140              const struct GNUNET_MessageHeader *message,
1141              const struct GNUNET_ATS_Information*atsi)
1142 {
1143   struct GNUNET_STREAM_Socket *socket = cls;
1144
1145   return handle_data (socket, 
1146                       tunnel, 
1147                       sender, 
1148                       (const struct GNUNET_STREAM_DataMessage *) message, 
1149                       atsi);
1150 }
1151
1152
1153 /**
1154  * Callback to set state to ESTABLISHED
1155  *
1156  * @param cls the closure from queue_message FIXME: document
1157  * @param socket the socket to requiring state change
1158  */
1159 static void
1160 set_state_established (void *cls,
1161                        struct GNUNET_STREAM_Socket *socket)
1162 {
1163   struct GNUNET_PeerIdentity initiator_pid;
1164
1165   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1166               "%x: Attaining ESTABLISHED state\n",
1167               socket->our_id);
1168   socket->write_offset = 0;
1169   socket->read_offset = 0;
1170   socket->state = STATE_ESTABLISHED;
1171   /* FIXME: What if listen_cb is NULL */
1172   if (NULL != socket->lsocket)
1173     {
1174       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1175       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176                   "%x: Calling listen callback\n",
1177                   socket->our_id);
1178       if (GNUNET_SYSERR == 
1179           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1180                                       socket,
1181                                       &initiator_pid))
1182         {
1183           socket->state = STATE_CLOSED;
1184           /* FIXME: We should close in a decent way */
1185           GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1186           GNUNET_free (socket);
1187         }
1188     }
1189   else if (socket->open_cb)
1190     socket->open_cb (socket->open_cls, socket);
1191 }
1192
1193
1194 /**
1195  * Callback to set state to HELLO_WAIT
1196  *
1197  * @param cls the closure from queue_message
1198  * @param socket the socket to requiring state change
1199  */
1200 static void
1201 set_state_hello_wait (void *cls,
1202                       struct GNUNET_STREAM_Socket *socket)
1203 {
1204   GNUNET_assert (STATE_INIT == socket->state);
1205   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1206               "%x: Attaining HELLO_WAIT state\n",
1207               socket->our_id);
1208   socket->state = STATE_HELLO_WAIT;
1209 }
1210
1211
1212 /**
1213  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1214  *
1215  * @param cls the socket (set from GNUNET_MESH_connect)
1216  * @param tunnel connection to the other end
1217  * @param tunnel_ctx this is NULL
1218  * @param sender who sent the message
1219  * @param message the actual message
1220  * @param atsi performance data for the connection
1221  * @return GNUNET_OK to keep the connection open,
1222  *         GNUNET_SYSERR to close it (signal serious error)
1223  */
1224 static int
1225 client_handle_hello_ack (void *cls,
1226                          struct GNUNET_MESH_Tunnel *tunnel,
1227                          void **tunnel_ctx,
1228                          const struct GNUNET_PeerIdentity *sender,
1229                          const struct GNUNET_MessageHeader *message,
1230                          const struct GNUNET_ATS_Information*atsi)
1231 {
1232   struct GNUNET_STREAM_Socket *socket = cls;
1233   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1234   struct GNUNET_STREAM_HelloAckMessage *reply;
1235
1236   if (GNUNET_PEER_search (sender) != socket->other_peer)
1237     {
1238       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239                   "%x: Received HELLO_ACK from non-confirming peer\n",
1240                   socket->our_id);
1241       return GNUNET_YES;
1242     }
1243   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1245               "%x: Received HELLO_ACK from %x\n",
1246               socket->our_id,
1247               socket->other_peer);
1248
1249   GNUNET_assert (socket->tunnel == tunnel);
1250   switch (socket->state)
1251   {
1252   case STATE_HELLO_WAIT:
1253     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1254     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255                 "%x: Read sequence number %u\n",
1256                 socket->our_id,
1257                 (unsigned int) socket->read_sequence_number);
1258     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1259     /* Get the random sequence number */
1260     socket->write_sequence_number = 
1261       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1262       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1263                   "%x: Generated write sequence number %u\n",
1264                   socket->our_id,
1265                   (unsigned int) socket->write_sequence_number);
1266     reply = 
1267       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1268     reply->header.header.size = 
1269       htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1270     reply->header.header.type = 
1271       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1272     reply->sequence_number = htonl (socket->write_sequence_number);
1273     reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1274     queue_message (socket, 
1275                    &reply->header, 
1276                    &set_state_established, 
1277                    NULL);      
1278     return GNUNET_OK;
1279   case STATE_ESTABLISHED:
1280   case STATE_RECEIVE_CLOSE_WAIT:
1281     // call statistics (# ACKs ignored++)
1282     return GNUNET_OK;
1283   case STATE_INIT:
1284   default:
1285     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1287                 socket->our_id,
1288                 socket->other_peer,
1289                 socket->state);
1290     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1291     return GNUNET_SYSERR;
1292   }
1293
1294 }
1295
1296
1297 /**
1298  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1299  *
1300  * @param cls the socket (set from GNUNET_MESH_connect)
1301  * @param tunnel connection to the other end
1302  * @param tunnel_ctx this is NULL
1303  * @param sender who sent the message
1304  * @param message the actual message
1305  * @param atsi performance data for the connection
1306  * @return GNUNET_OK to keep the connection open,
1307  *         GNUNET_SYSERR to close it (signal serious error)
1308  */
1309 static int
1310 client_handle_reset (void *cls,
1311                      struct GNUNET_MESH_Tunnel *tunnel,
1312                      void **tunnel_ctx,
1313                      const struct GNUNET_PeerIdentity *sender,
1314                      const struct GNUNET_MessageHeader *message,
1315                      const struct GNUNET_ATS_Information*atsi)
1316 {
1317   struct GNUNET_STREAM_Socket *socket = cls;
1318
1319   return GNUNET_OK;
1320 }
1321
1322
1323 /**
1324  * Common message handler for handling TRANSMIT_CLOSE messages
1325  *
1326  * @param socket the socket through which the ack was received
1327  * @param tunnel connection to the other end
1328  * @param sender who sent the message
1329  * @param msg the transmit close message
1330  * @param atsi performance data for the connection
1331  * @return GNUNET_OK to keep the connection open,
1332  *         GNUNET_SYSERR to close it (signal serious error)
1333  */
1334 static int
1335 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1336                        struct GNUNET_MESH_Tunnel *tunnel,
1337                        const struct GNUNET_PeerIdentity *sender,
1338                        const struct GNUNET_STREAM_MessageHeader *msg,
1339                        const struct GNUNET_ATS_Information*atsi)
1340 {
1341   struct GNUNET_STREAM_MessageHeader *reply;
1342
1343   switch (socket->state)
1344     {
1345     case STATE_ESTABLISHED:
1346       socket->state = STATE_RECEIVE_CLOSED;
1347
1348       /* Send TRANSMIT_CLOSE_ACK */
1349       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1350       reply->header.type = 
1351         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1352       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1353       queue_message (socket, reply, NULL, NULL);
1354       break;
1355
1356     default:
1357       /* FIXME: Call statistics? */
1358       break;
1359     }
1360   return GNUNET_YES;
1361 }
1362
1363
1364 /**
1365  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1366  *
1367  * @param cls the socket (set from GNUNET_MESH_connect)
1368  * @param tunnel connection to the other end
1369  * @param tunnel_ctx this is NULL
1370  * @param sender who sent the message
1371  * @param message the actual message
1372  * @param atsi performance data for the connection
1373  * @return GNUNET_OK to keep the connection open,
1374  *         GNUNET_SYSERR to close it (signal serious error)
1375  */
1376 static int
1377 client_handle_transmit_close (void *cls,
1378                               struct GNUNET_MESH_Tunnel *tunnel,
1379                               void **tunnel_ctx,
1380                               const struct GNUNET_PeerIdentity *sender,
1381                               const struct GNUNET_MessageHeader *message,
1382                               const struct GNUNET_ATS_Information*atsi)
1383 {
1384   struct GNUNET_STREAM_Socket *socket = cls;
1385   
1386   return handle_transmit_close (socket,
1387                                 tunnel,
1388                                 sender,
1389                                 (struct GNUNET_STREAM_MessageHeader *)message,
1390                                 atsi);
1391 }
1392
1393
1394 /**
1395  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1396  *
1397  * @param cls the socket (set from GNUNET_MESH_connect)
1398  * @param tunnel connection to the other end
1399  * @param tunnel_ctx this is NULL
1400  * @param sender who sent the message
1401  * @param message the actual message
1402  * @param atsi performance data for the connection
1403  * @return GNUNET_OK to keep the connection open,
1404  *         GNUNET_SYSERR to close it (signal serious error)
1405  */
1406 static int
1407 client_handle_transmit_close_ack (void *cls,
1408                                   struct GNUNET_MESH_Tunnel *tunnel,
1409                                   void **tunnel_ctx,
1410                                   const struct GNUNET_PeerIdentity *sender,
1411                                   const struct GNUNET_MessageHeader *message,
1412                                   const struct GNUNET_ATS_Information*atsi)
1413 {
1414   struct GNUNET_STREAM_Socket *socket = cls;
1415
1416   return GNUNET_OK;
1417 }
1418
1419
1420 /**
1421  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1422  *
1423  * @param cls the socket (set from GNUNET_MESH_connect)
1424  * @param tunnel connection to the other end
1425  * @param tunnel_ctx this is NULL
1426  * @param sender who sent the message
1427  * @param message the actual message
1428  * @param atsi performance data for the connection
1429  * @return GNUNET_OK to keep the connection open,
1430  *         GNUNET_SYSERR to close it (signal serious error)
1431  */
1432 static int
1433 client_handle_receive_close (void *cls,
1434                              struct GNUNET_MESH_Tunnel *tunnel,
1435                              void **tunnel_ctx,
1436                              const struct GNUNET_PeerIdentity *sender,
1437                              const struct GNUNET_MessageHeader *message,
1438                              const struct GNUNET_ATS_Information*atsi)
1439 {
1440   struct GNUNET_STREAM_Socket *socket = cls;
1441
1442   return GNUNET_OK;
1443 }
1444
1445
1446 /**
1447  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1448  *
1449  * @param cls the socket (set from GNUNET_MESH_connect)
1450  * @param tunnel connection to the other end
1451  * @param tunnel_ctx this is NULL
1452  * @param sender who sent the message
1453  * @param message the actual message
1454  * @param atsi performance data for the connection
1455  * @return GNUNET_OK to keep the connection open,
1456  *         GNUNET_SYSERR to close it (signal serious error)
1457  */
1458 static int
1459 client_handle_receive_close_ack (void *cls,
1460                                  struct GNUNET_MESH_Tunnel *tunnel,
1461                                  void **tunnel_ctx,
1462                                  const struct GNUNET_PeerIdentity *sender,
1463                                  const struct GNUNET_MessageHeader *message,
1464                                  const struct GNUNET_ATS_Information*atsi)
1465 {
1466   struct GNUNET_STREAM_Socket *socket = cls;
1467
1468   return GNUNET_OK;
1469 }
1470
1471
1472 /**
1473  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1474  *
1475  * @param cls the socket (set from GNUNET_MESH_connect)
1476  * @param tunnel connection to the other end
1477  * @param tunnel_ctx this is NULL
1478  * @param sender who sent the message
1479  * @param message the actual message
1480  * @param atsi performance data for the connection
1481  * @return GNUNET_OK to keep the connection open,
1482  *         GNUNET_SYSERR to close it (signal serious error)
1483  */
1484 static int
1485 client_handle_close (void *cls,
1486                      struct GNUNET_MESH_Tunnel *tunnel,
1487                      void **tunnel_ctx,
1488                      const struct GNUNET_PeerIdentity *sender,
1489                      const struct GNUNET_MessageHeader *message,
1490                      const struct GNUNET_ATS_Information*atsi)
1491 {
1492   struct GNUNET_STREAM_Socket *socket = cls;
1493
1494   return GNUNET_OK;
1495 }
1496
1497
1498 /**
1499  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1500  *
1501  * @param cls the socket (set from GNUNET_MESH_connect)
1502  * @param tunnel connection to the other end
1503  * @param tunnel_ctx this is NULL
1504  * @param sender who sent the message
1505  * @param message the actual message
1506  * @param atsi performance data for the connection
1507  * @return GNUNET_OK to keep the connection open,
1508  *         GNUNET_SYSERR to close it (signal serious error)
1509  */
1510 static int
1511 client_handle_close_ack (void *cls,
1512                          struct GNUNET_MESH_Tunnel *tunnel,
1513                          void **tunnel_ctx,
1514                          const struct GNUNET_PeerIdentity *sender,
1515                          const struct GNUNET_MessageHeader *message,
1516                          const struct GNUNET_ATS_Information*atsi)
1517 {
1518   struct GNUNET_STREAM_Socket *socket = cls;
1519
1520   return GNUNET_OK;
1521 }
1522
1523 /*****************************/
1524 /* Server's Message Handlers */
1525 /*****************************/
1526
1527 /**
1528  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1529  *
1530  * @param cls the closure
1531  * @param tunnel connection to the other end
1532  * @param tunnel_ctx the socket
1533  * @param sender who sent the message
1534  * @param message the actual message
1535  * @param atsi performance data for the connection
1536  * @return GNUNET_OK to keep the connection open,
1537  *         GNUNET_SYSERR to close it (signal serious error)
1538  */
1539 static int
1540 server_handle_data (void *cls,
1541                     struct GNUNET_MESH_Tunnel *tunnel,
1542                     void **tunnel_ctx,
1543                     const struct GNUNET_PeerIdentity *sender,
1544                     const struct GNUNET_MessageHeader *message,
1545                     const struct GNUNET_ATS_Information*atsi)
1546 {
1547   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1548
1549   return handle_data (socket,
1550                       tunnel,
1551                       sender,
1552                       (const struct GNUNET_STREAM_DataMessage *)message,
1553                       atsi);
1554 }
1555
1556
1557 /**
1558  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1559  *
1560  * @param cls the closure
1561  * @param tunnel connection to the other end
1562  * @param tunnel_ctx the socket
1563  * @param sender who sent the message
1564  * @param message the actual message
1565  * @param atsi performance data for the connection
1566  * @return GNUNET_OK to keep the connection open,
1567  *         GNUNET_SYSERR to close it (signal serious error)
1568  */
1569 static int
1570 server_handle_hello (void *cls,
1571                      struct GNUNET_MESH_Tunnel *tunnel,
1572                      void **tunnel_ctx,
1573                      const struct GNUNET_PeerIdentity *sender,
1574                      const struct GNUNET_MessageHeader *message,
1575                      const struct GNUNET_ATS_Information*atsi)
1576 {
1577   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1578   struct GNUNET_STREAM_HelloAckMessage *reply;
1579
1580   if (GNUNET_PEER_search (sender) != socket->other_peer)
1581     {
1582       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1583                   "%x: Received HELLO from non-confirming peer\n",
1584                   socket->our_id);
1585       return GNUNET_YES;
1586     }
1587
1588   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1589                  ntohs (message->type));
1590   GNUNET_assert (socket->tunnel == tunnel);
1591   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1592               "%x: Received HELLO from %x\n", 
1593               socket->our_id,
1594               socket->other_peer);
1595
1596   if (STATE_INIT == socket->state)
1597     {
1598       /* Get the random sequence number */
1599       socket->write_sequence_number = 
1600         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1602                   "%x: Generated write sequence number %u\n",
1603                   socket->our_id,
1604                   (unsigned int) socket->write_sequence_number);
1605       reply = 
1606         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1607       reply->header.header.size = 
1608         htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1609       reply->header.header.type = 
1610         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1611       reply->sequence_number = htonl (socket->write_sequence_number);
1612       reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1613       queue_message (socket, 
1614                      &reply->header,
1615                      &set_state_hello_wait, 
1616                      NULL);
1617     }
1618   else
1619     {
1620       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1621                   "Client sent HELLO when in state %d\n", socket->state);
1622       /* FIXME: Send RESET? */
1623       
1624     }
1625   return GNUNET_OK;
1626 }
1627
1628
1629 /**
1630  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1631  *
1632  * @param cls the closure
1633  * @param tunnel connection to the other end
1634  * @param tunnel_ctx the socket
1635  * @param sender who sent the message
1636  * @param message the actual message
1637  * @param atsi performance data for the connection
1638  * @return GNUNET_OK to keep the connection open,
1639  *         GNUNET_SYSERR to close it (signal serious error)
1640  */
1641 static int
1642 server_handle_hello_ack (void *cls,
1643                          struct GNUNET_MESH_Tunnel *tunnel,
1644                          void **tunnel_ctx,
1645                          const struct GNUNET_PeerIdentity *sender,
1646                          const struct GNUNET_MessageHeader *message,
1647                          const struct GNUNET_ATS_Information*atsi)
1648 {
1649   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1650   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1651
1652   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1653                  ntohs (message->type));
1654   GNUNET_assert (socket->tunnel == tunnel);
1655   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1656   if (STATE_HELLO_WAIT == socket->state)
1657     {
1658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659                   "%x: Received HELLO_ACK from %x\n",
1660                   socket->our_id,
1661                   socket->other_peer);
1662       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1663       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1664                   "%x: Read sequence number %u\n",
1665                   socket->our_id,
1666                   (unsigned int) socket->read_sequence_number);
1667       socket->receiver_window_available = 
1668         ntohl (ack_message->receiver_window_size);
1669       /* Attain ESTABLISHED state */
1670       set_state_established (NULL, socket);
1671     }
1672   else
1673     {
1674       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1675                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1676       /* FIXME: Send RESET? */
1677       
1678     }
1679   return GNUNET_OK;
1680 }
1681
1682
1683 /**
1684  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1685  *
1686  * @param cls the closure
1687  * @param tunnel connection to the other end
1688  * @param tunnel_ctx the socket
1689  * @param sender who sent the message
1690  * @param message the actual message
1691  * @param atsi performance data for the connection
1692  * @return GNUNET_OK to keep the connection open,
1693  *         GNUNET_SYSERR to close it (signal serious error)
1694  */
1695 static int
1696 server_handle_reset (void *cls,
1697                      struct GNUNET_MESH_Tunnel *tunnel,
1698                      void **tunnel_ctx,
1699                      const struct GNUNET_PeerIdentity *sender,
1700                      const struct GNUNET_MessageHeader *message,
1701                      const struct GNUNET_ATS_Information*atsi)
1702 {
1703   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1704
1705   return GNUNET_OK;
1706 }
1707
1708
1709 /**
1710  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1711  *
1712  * @param cls the closure
1713  * @param tunnel connection to the other end
1714  * @param tunnel_ctx the socket
1715  * @param sender who sent the message
1716  * @param message the actual message
1717  * @param atsi performance data for the connection
1718  * @return GNUNET_OK to keep the connection open,
1719  *         GNUNET_SYSERR to close it (signal serious error)
1720  */
1721 static int
1722 server_handle_transmit_close (void *cls,
1723                               struct GNUNET_MESH_Tunnel *tunnel,
1724                               void **tunnel_ctx,
1725                               const struct GNUNET_PeerIdentity *sender,
1726                               const struct GNUNET_MessageHeader *message,
1727                               const struct GNUNET_ATS_Information*atsi)
1728 {
1729   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1730
1731   return handle_transmit_close (socket,
1732                                 tunnel,
1733                                 sender,
1734                                 (struct GNUNET_STREAM_MessageHeader *)message,
1735                                 atsi);
1736 }
1737
1738
1739 /**
1740  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1741  *
1742  * @param cls the closure
1743  * @param tunnel connection to the other end
1744  * @param tunnel_ctx the socket
1745  * @param sender who sent the message
1746  * @param message the actual message
1747  * @param atsi performance data for the connection
1748  * @return GNUNET_OK to keep the connection open,
1749  *         GNUNET_SYSERR to close it (signal serious error)
1750  */
1751 static int
1752 server_handle_transmit_close_ack (void *cls,
1753                                   struct GNUNET_MESH_Tunnel *tunnel,
1754                                   void **tunnel_ctx,
1755                                   const struct GNUNET_PeerIdentity *sender,
1756                                   const struct GNUNET_MessageHeader *message,
1757                                   const struct GNUNET_ATS_Information*atsi)
1758 {
1759   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1760
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1767  *
1768  * @param cls the closure
1769  * @param tunnel connection to the other end
1770  * @param tunnel_ctx the socket
1771  * @param sender who sent the message
1772  * @param message the actual message
1773  * @param atsi performance data for the connection
1774  * @return GNUNET_OK to keep the connection open,
1775  *         GNUNET_SYSERR to close it (signal serious error)
1776  */
1777 static int
1778 server_handle_receive_close (void *cls,
1779                              struct GNUNET_MESH_Tunnel *tunnel,
1780                              void **tunnel_ctx,
1781                              const struct GNUNET_PeerIdentity *sender,
1782                              const struct GNUNET_MessageHeader *message,
1783                              const struct GNUNET_ATS_Information*atsi)
1784 {
1785   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1786
1787   return GNUNET_OK;
1788 }
1789
1790
1791 /**
1792  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1793  *
1794  * @param cls the closure
1795  * @param tunnel connection to the other end
1796  * @param tunnel_ctx the socket
1797  * @param sender who sent the message
1798  * @param message the actual message
1799  * @param atsi performance data for the connection
1800  * @return GNUNET_OK to keep the connection open,
1801  *         GNUNET_SYSERR to close it (signal serious error)
1802  */
1803 static int
1804 server_handle_receive_close_ack (void *cls,
1805                                  struct GNUNET_MESH_Tunnel *tunnel,
1806                                  void **tunnel_ctx,
1807                                  const struct GNUNET_PeerIdentity *sender,
1808                                  const struct GNUNET_MessageHeader *message,
1809                                  const struct GNUNET_ATS_Information*atsi)
1810 {
1811   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1812
1813   return GNUNET_OK;
1814 }
1815
1816
1817 /**
1818  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1819  *
1820  * @param cls the closure
1821  * @param tunnel connection to the other end
1822  * @param tunnel_ctx the socket
1823  * @param sender who sent the message
1824  * @param message the actual message
1825  * @param atsi performance data for the connection
1826  * @return GNUNET_OK to keep the connection open,
1827  *         GNUNET_SYSERR to close it (signal serious error)
1828  */
1829 static int
1830 server_handle_close (void *cls,
1831                      struct GNUNET_MESH_Tunnel *tunnel,
1832                      void **tunnel_ctx,
1833                      const struct GNUNET_PeerIdentity *sender,
1834                      const struct GNUNET_MessageHeader *message,
1835                      const struct GNUNET_ATS_Information*atsi)
1836 {
1837   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1838
1839   return GNUNET_OK;
1840 }
1841
1842
1843 /**
1844  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1845  *
1846  * @param cls the closure
1847  * @param tunnel connection to the other end
1848  * @param tunnel_ctx the socket
1849  * @param sender who sent the message
1850  * @param message the actual message
1851  * @param atsi performance data for the connection
1852  * @return GNUNET_OK to keep the connection open,
1853  *         GNUNET_SYSERR to close it (signal serious error)
1854  */
1855 static int
1856 server_handle_close_ack (void *cls,
1857                          struct GNUNET_MESH_Tunnel *tunnel,
1858                          void **tunnel_ctx,
1859                          const struct GNUNET_PeerIdentity *sender,
1860                          const struct GNUNET_MessageHeader *message,
1861                          const struct GNUNET_ATS_Information*atsi)
1862 {
1863   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1864
1865   return GNUNET_OK;
1866 }
1867
1868
1869 /**
1870  * Message Handler for mesh
1871  *
1872  * @param socket the socket through which the ack was received
1873  * @param tunnel connection to the other end
1874  * @param sender who sent the message
1875  * @param ack the acknowledgment message
1876  * @param atsi performance data for the connection
1877  * @return GNUNET_OK to keep the connection open,
1878  *         GNUNET_SYSERR to close it (signal serious error)
1879  */
1880 static int
1881 handle_ack (struct GNUNET_STREAM_Socket *socket,
1882             struct GNUNET_MESH_Tunnel *tunnel,
1883             const struct GNUNET_PeerIdentity *sender,
1884             const struct GNUNET_STREAM_AckMessage *ack,
1885             const struct GNUNET_ATS_Information*atsi)
1886 {
1887   unsigned int packet;
1888   int need_retransmission;
1889
1890   if (GNUNET_PEER_search (sender) != socket->other_peer)
1891     {
1892       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1893                   "%x: Received ACK from non-confirming peer\n",
1894                   socket->our_id);
1895       return GNUNET_YES;
1896     }
1897
1898   switch (socket->state)
1899     {
1900     case (STATE_ESTABLISHED):
1901       if (NULL == socket->write_handle)
1902         {
1903           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904                       "%x: Received DATA_ACK when write_handle is NULL\n",
1905                       socket->our_id);
1906           return GNUNET_OK;
1907         }
1908       /* FIXME: increment in the base sequence number is breaking current flow
1909        */
1910       if (!((socket->write_sequence_number 
1911              - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1912         {
1913           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914                       "%x: Received DATA_ACK with unexpected base sequence "
1915                       "number\n",
1916                       socket->our_id);
1917           return GNUNET_OK;
1918         }
1919       /* FIXME: include the case when write_handle is cancelled - ignore the 
1920          acks */
1921
1922       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1923                   "%x: Received DATA_ACK from %x\n",
1924                   socket->our_id,
1925                   socket->other_peer);
1926       
1927       /* Cancel the retransmission task */
1928       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1929         {
1930           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1931           socket->retransmission_timeout_task_id = 
1932             GNUNET_SCHEDULER_NO_TASK;
1933         }
1934
1935       /* FIXME: Bits in the ack_bitmap are only to be set; Once set they cannot
1936          be unset */
1937       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1938       socket->receiver_window_available = 
1939         ntohl (ack->receive_window_remaining);
1940
1941       /* Check if we have received all acknowledgements */
1942       need_retransmission = GNUNET_NO;
1943       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1944         {
1945           if (NULL == socket->write_handle->messages[packet]) break;
1946           if (GNUNET_YES != ackbitmap_is_bit_set 
1947               (&socket->write_handle->ack_bitmap,packet))
1948             {
1949               need_retransmission = GNUNET_YES;
1950               break;
1951             }
1952         }
1953       if (GNUNET_YES == need_retransmission)
1954         {
1955           write_data (socket);
1956         }
1957       else      /* We have to call the write continuation callback now */
1958         {
1959           /* Free the packets */
1960           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1961             {
1962               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1963             }
1964           if (NULL != socket->write_handle->write_cont)
1965             socket->write_handle->write_cont
1966               (socket->write_handle->write_cont_cls,
1967                socket->status,
1968                socket->write_handle->size);
1969           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1970                       "%x: Write completion callback completed\n",
1971                       socket->our_id);
1972           /* We are done with the write handle - Freeing it */
1973           GNUNET_free (socket->write_handle);
1974           socket->write_handle = NULL;
1975         }
1976       break;
1977     default:
1978       break;
1979     }
1980   return GNUNET_OK;
1981 }
1982
1983
1984 /**
1985  * Message Handler for mesh
1986  *
1987  * @param cls the 'struct GNUNET_STREAM_Socket'
1988  * @param tunnel connection to the other end
1989  * @param tunnel_ctx unused
1990  * @param sender who sent the message
1991  * @param message the actual message
1992  * @param atsi performance data for the connection
1993  * @return GNUNET_OK to keep the connection open,
1994  *         GNUNET_SYSERR to close it (signal serious error)
1995  */
1996 static int
1997 client_handle_ack (void *cls,
1998                    struct GNUNET_MESH_Tunnel *tunnel,
1999                    void **tunnel_ctx,
2000                    const struct GNUNET_PeerIdentity *sender,
2001                    const struct GNUNET_MessageHeader *message,
2002                    const struct GNUNET_ATS_Information*atsi)
2003 {
2004   struct GNUNET_STREAM_Socket *socket = cls;
2005   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2006  
2007   return handle_ack (socket, tunnel, sender, ack, atsi);
2008 }
2009
2010
2011 /**
2012  * Message Handler for mesh
2013  *
2014  * @param cls the server's listen socket
2015  * @param tunnel connection to the other end
2016  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2017  * @param sender who sent the message
2018  * @param message the actual message
2019  * @param atsi performance data for the connection
2020  * @return GNUNET_OK to keep the connection open,
2021  *         GNUNET_SYSERR to close it (signal serious error)
2022  */
2023 static int
2024 server_handle_ack (void *cls,
2025                    struct GNUNET_MESH_Tunnel *tunnel,
2026                    void **tunnel_ctx,
2027                    const struct GNUNET_PeerIdentity *sender,
2028                    const struct GNUNET_MessageHeader *message,
2029                    const struct GNUNET_ATS_Information*atsi)
2030 {
2031   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2032   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2033  
2034   return handle_ack (socket, tunnel, sender, ack, atsi);
2035 }
2036
2037
2038 /**
2039  * For client message handlers, the stream socket is in the
2040  * closure argument.
2041  */
2042 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2043   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2044   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2045    sizeof (struct GNUNET_STREAM_AckMessage) },
2046   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2047    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2048   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2049    sizeof (struct GNUNET_STREAM_MessageHeader)},
2050   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2051    sizeof (struct GNUNET_STREAM_MessageHeader)},
2052   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2053    sizeof (struct GNUNET_STREAM_MessageHeader)},
2054   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2055    sizeof (struct GNUNET_STREAM_MessageHeader)},
2056   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2057    sizeof (struct GNUNET_STREAM_MessageHeader)},
2058   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2059    sizeof (struct GNUNET_STREAM_MessageHeader)},
2060   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2061    sizeof (struct GNUNET_STREAM_MessageHeader)},
2062   {NULL, 0, 0}
2063 };
2064
2065
2066 /**
2067  * For server message handlers, the stream socket is in the
2068  * tunnel context, and the listen socket in the closure argument.
2069  */
2070 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2071   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2072   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2073    sizeof (struct GNUNET_STREAM_AckMessage) },
2074   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2075    sizeof (struct GNUNET_STREAM_MessageHeader)},
2076   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2077    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2078   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2079    sizeof (struct GNUNET_STREAM_MessageHeader)},
2080   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2081    sizeof (struct GNUNET_STREAM_MessageHeader)},
2082   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2083    sizeof (struct GNUNET_STREAM_MessageHeader)},
2084   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2085    sizeof (struct GNUNET_STREAM_MessageHeader)},
2086   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2087    sizeof (struct GNUNET_STREAM_MessageHeader)},
2088   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2089    sizeof (struct GNUNET_STREAM_MessageHeader)},
2090   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2091    sizeof (struct GNUNET_STREAM_MessageHeader)},
2092   {NULL, 0, 0}
2093 };
2094
2095
2096 /**
2097  * Function called when our target peer is connected to our tunnel
2098  *
2099  * @param cls the socket for which this tunnel is created
2100  * @param peer the peer identity of the target
2101  * @param atsi performance data for the connection
2102  */
2103 static void
2104 mesh_peer_connect_callback (void *cls,
2105                             const struct GNUNET_PeerIdentity *peer,
2106                             const struct GNUNET_ATS_Information * atsi)
2107 {
2108   struct GNUNET_STREAM_Socket *socket = cls;
2109   struct GNUNET_STREAM_MessageHeader *message;
2110   GNUNET_PEER_Id connected_peer;
2111
2112   connected_peer = GNUNET_PEER_search (peer);
2113   
2114   if (connected_peer != socket->other_peer)
2115     {
2116       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2117                   "%x: A peer which is not our target has connected",
2118                   "to our tunnel\n",
2119                   socket->our_id);
2120       return;
2121     }
2122   
2123   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2124               "%x: Target peer %x connected\n", 
2125               socket->our_id,
2126               connected_peer);
2127   
2128   /* Set state to INIT */
2129   socket->state = STATE_INIT;
2130
2131   /* Send HELLO message */
2132   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2133   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2134   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2135   queue_message (socket,
2136                  message,
2137                  &set_state_hello_wait,
2138                  NULL);
2139
2140   /* Call open callback */
2141   if (NULL == socket->open_cb)
2142     {
2143       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2144                   "STREAM_open callback is NULL\n");
2145     }
2146 }
2147
2148
2149 /**
2150  * Function called when our target peer is disconnected from our tunnel
2151  *
2152  * @param cls the socket associated which this tunnel
2153  * @param peer the peer identity of the target
2154  */
2155 static void
2156 mesh_peer_disconnect_callback (void *cls,
2157                                const struct GNUNET_PeerIdentity *peer)
2158 {
2159
2160 }
2161
2162
2163 /**
2164  * Method called whenever a peer creates a tunnel to us
2165  *
2166  * @param cls closure
2167  * @param tunnel new handle to the tunnel
2168  * @param initiator peer that started the tunnel
2169  * @param atsi performance information for the tunnel
2170  * @return initial tunnel context for the tunnel
2171  *         (can be NULL -- that's not an error)
2172  */
2173 static void *
2174 new_tunnel_notify (void *cls,
2175                    struct GNUNET_MESH_Tunnel *tunnel,
2176                    const struct GNUNET_PeerIdentity *initiator,
2177                    const struct GNUNET_ATS_Information *atsi)
2178 {
2179   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2180   struct GNUNET_STREAM_Socket *socket;
2181
2182   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2183      from the same peer again until the socket is closed */
2184
2185   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2186   socket->other_peer = GNUNET_PEER_intern (initiator);
2187   socket->tunnel = tunnel;
2188   socket->session_id = 0;       /* FIXME */
2189   socket->state = STATE_INIT;
2190   socket->lsocket = lsocket;
2191   socket->our_id = lsocket->our_id;
2192   
2193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2194               "%x: Peer %x initiated tunnel to us\n", 
2195               socket->our_id,
2196               socket->other_peer);
2197   
2198   /* FIXME: Copy MESH handle from lsocket to socket */
2199   
2200   return socket;
2201 }
2202
2203
2204 /**
2205  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2206  * any associated state.  This function is NOT called if the client has
2207  * explicitly asked for the tunnel to be destroyed using
2208  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2209  * the tunnel.
2210  *
2211  * @param cls closure (set from GNUNET_MESH_connect)
2212  * @param tunnel connection to the other end (henceforth invalid)
2213  * @param tunnel_ctx place where local state associated
2214  *                   with the tunnel is stored
2215  */
2216 static void 
2217 tunnel_cleaner (void *cls,
2218                 const struct GNUNET_MESH_Tunnel *tunnel,
2219                 void *tunnel_ctx)
2220 {
2221   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2222   
2223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2224               "%x: Peer %x has terminated connection abruptly\n",
2225               socket->our_id,
2226               socket->other_peer);
2227
2228   socket->status = GNUNET_STREAM_SHUTDOWN;
2229
2230   /* Clear Transmit handles */
2231   if (NULL != socket->transmit_handle)
2232     {
2233       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2234       socket->transmit_handle = NULL;
2235     }
2236   socket->tunnel = NULL;
2237 }
2238
2239
2240 /*****************/
2241 /* API functions */
2242 /*****************/
2243
2244
2245 /**
2246  * Tries to open a stream to the target peer
2247  *
2248  * @param cfg configuration to use
2249  * @param target the target peer to which the stream has to be opened
2250  * @param app_port the application port number which uniquely identifies this
2251  *            stream
2252  * @param open_cb this function will be called after stream has be established 
2253  * @param open_cb_cls the closure for open_cb
2254  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2255  * @return if successful it returns the stream socket; NULL if stream cannot be
2256  *         opened 
2257  */
2258 struct GNUNET_STREAM_Socket *
2259 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2260                     const struct GNUNET_PeerIdentity *target,
2261                     GNUNET_MESH_ApplicationType app_port,
2262                     GNUNET_STREAM_OpenCallback open_cb,
2263                     void *open_cb_cls,
2264                     ...)
2265 {
2266   struct GNUNET_STREAM_Socket *socket;
2267   struct GNUNET_PeerIdentity own_peer_id;
2268   enum GNUNET_STREAM_Option option;
2269   va_list vargs;                /* Variable arguments */
2270
2271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2272               "%s\n", __func__);
2273
2274   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2275   socket->other_peer = GNUNET_PEER_intern (target);
2276   socket->open_cb = open_cb;
2277   socket->open_cls = open_cb_cls;
2278   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2279   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2280   
2281   /* Set defaults */
2282   socket->retransmit_timeout = 
2283     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2284
2285   va_start (vargs, open_cb_cls); /* Parse variable args */
2286   do {
2287     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2288     switch (option)
2289       {
2290       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2291         /* Expect struct GNUNET_TIME_Relative */
2292         socket->retransmit_timeout = va_arg (vargs,
2293                                              struct GNUNET_TIME_Relative);
2294         break;
2295       case GNUNET_STREAM_OPTION_END:
2296         break;
2297       }
2298   } while (GNUNET_STREAM_OPTION_END != option);
2299   va_end (vargs);               /* End of variable args parsing */
2300   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2301                                       10,  /* QUEUE size as parameter? */
2302                                       socket, /* cls */
2303                                       NULL, /* No inbound tunnel handler */
2304                                       &tunnel_cleaner, /* FIXME: not required? */
2305                                       client_message_handlers,
2306                                       &app_port); /* We don't get inbound tunnels */
2307   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2308     {
2309       GNUNET_free (socket);
2310       return NULL;
2311     }
2312
2313   /* Now create the mesh tunnel to target */
2314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2315               "Creating MESH Tunnel\n");
2316   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2317                                               NULL, /* Tunnel context */
2318                                               &mesh_peer_connect_callback,
2319                                               &mesh_peer_disconnect_callback,
2320                                               socket);
2321   GNUNET_assert (NULL != socket->tunnel);
2322   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2323                                         target);
2324   
2325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2326               "%s() END\n", __func__);
2327   return socket;
2328 }
2329
2330
2331 /**
2332  * Shutdown the stream for reading or writing (man 2 shutdown).
2333  *
2334  * @param socket the stream socket
2335  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2336  */
2337 void
2338 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2339                         int how)
2340 {
2341   return;
2342 }
2343
2344
2345 /**
2346  * Closes the stream
2347  *
2348  * @param socket the stream socket
2349  */
2350 void
2351 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2352 {
2353   struct MessageQueue *head;
2354
2355   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2356   {
2357     /* socket closed with read task pending!? */
2358     GNUNET_break (0);
2359     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2360     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2361   }
2362
2363   /* Clear Transmit handles */
2364   if (NULL != socket->transmit_handle)
2365     {
2366       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2367       socket->transmit_handle = NULL;
2368     }
2369
2370   /* Clear existing message queue */
2371   while (NULL != (head = socket->queue_head)) {
2372     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2373                                  socket->queue_tail,
2374                                  head);
2375     GNUNET_free (head->message);
2376     GNUNET_free (head);
2377   }
2378
2379   /* Close associated tunnel */
2380   if (NULL != socket->tunnel)
2381     {
2382       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2383       socket->tunnel = NULL;
2384     }
2385
2386   /* Close mesh connection */
2387   if (NULL != socket->mesh && NULL == socket->lsocket)
2388     {
2389       GNUNET_MESH_disconnect (socket->mesh);
2390       socket->mesh = NULL;
2391     }
2392   
2393   /* Release receive buffer */
2394   if (NULL != socket->receive_buffer)
2395     {
2396       GNUNET_free (socket->receive_buffer);
2397     }
2398
2399   GNUNET_free (socket);
2400 }
2401
2402
2403 /**
2404  * Listens for stream connections for a specific application ports
2405  *
2406  * @param cfg the configuration to use
2407  * @param app_port the application port for which new streams will be accepted
2408  * @param listen_cb this function will be called when a peer tries to establish
2409  *            a stream with us
2410  * @param listen_cb_cls closure for listen_cb
2411  * @return listen socket, NULL for any error
2412  */
2413 struct GNUNET_STREAM_ListenSocket *
2414 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2415                       GNUNET_MESH_ApplicationType app_port,
2416                       GNUNET_STREAM_ListenCallback listen_cb,
2417                       void *listen_cb_cls)
2418 {
2419   /* FIXME: Add variable args for passing configration options? */
2420   struct GNUNET_STREAM_ListenSocket *lsocket;
2421   struct GNUNET_PeerIdentity our_peer_id;
2422
2423   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2424   lsocket->port = app_port;
2425   lsocket->listen_cb = listen_cb;
2426   lsocket->listen_cb_cls = listen_cb_cls;
2427   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2428   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2429   lsocket->mesh = GNUNET_MESH_connect (cfg,
2430                                        10, /* FIXME: QUEUE size as parameter? */
2431                                        lsocket, /* Closure */
2432                                        &new_tunnel_notify,
2433                                        &tunnel_cleaner,
2434                                        server_message_handlers,
2435                                        &app_port);
2436   GNUNET_assert (NULL != lsocket->mesh);
2437   return lsocket;
2438 }
2439
2440
2441 /**
2442  * Closes the listen socket
2443  *
2444  * @param lsocket the listen socket
2445  */
2446 void
2447 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2448 {
2449   /* Close MESH connection */
2450   GNUNET_assert (NULL != lsocket->mesh);
2451   GNUNET_MESH_disconnect (lsocket->mesh);
2452   
2453   GNUNET_free (lsocket);
2454 }
2455
2456
2457 /**
2458  * Tries to write the given data to the stream
2459  *
2460  * @param socket the socket representing a stream
2461  * @param data the data buffer from where the data is written into the stream
2462  * @param size the number of bytes to be written from the data buffer
2463  * @param timeout the timeout period
2464  * @param write_cont the function to call upon writing some bytes into the stream
2465  * @param write_cont_cls the closure
2466  * @return handle to cancel the operation
2467  */
2468 struct GNUNET_STREAM_IOWriteHandle *
2469 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2470                      const void *data,
2471                      size_t size,
2472                      struct GNUNET_TIME_Relative timeout,
2473                      GNUNET_STREAM_CompletionContinuation write_cont,
2474                      void *write_cont_cls)
2475 {
2476   unsigned int num_needed_packets;
2477   unsigned int packet;
2478   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2479   uint32_t packet_size;
2480   uint32_t payload_size;
2481   struct GNUNET_STREAM_DataMessage *data_msg;
2482   const void *sweep;
2483   struct GNUNET_TIME_Relative ack_deadline;
2484
2485   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2486               "%s\n", __func__);
2487
2488   /* Return NULL if there is already a write request pending */
2489   if (NULL != socket->write_handle)
2490   {
2491     GNUNET_break (0);
2492     return NULL;
2493   }
2494   if (!((STATE_ESTABLISHED == socket->state)
2495         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2496         || (STATE_RECEIVE_CLOSED == socket->state)))
2497     {
2498       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2499                   "%x: Attempting to write on a closed (OR) not-yet-established"
2500                   "stream\n",
2501                   socket->our_id);
2502       return NULL;
2503     } 
2504   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2505     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2506   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2507   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2508   io_handle->write_cont = write_cont;
2509   io_handle->write_cont_cls = write_cont_cls;
2510   io_handle->size = size;
2511   sweep = data;
2512   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2513      determined from RTT */
2514   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2515   /* Divide the given buffer into packets for sending */
2516   for (packet=0; packet < num_needed_packets; packet++)
2517     {
2518       if ((packet + 1) * max_payload_size < size) 
2519         {
2520           payload_size = max_payload_size;
2521           packet_size = MAX_PACKET_SIZE;
2522         }
2523       else 
2524         {
2525           payload_size = size - packet * max_payload_size;
2526           packet_size =  payload_size + sizeof (struct
2527                                                 GNUNET_STREAM_DataMessage); 
2528         }
2529       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2530       io_handle->messages[packet]->header.header.size = htons (packet_size);
2531       io_handle->messages[packet]->header.header.type =
2532         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2533       io_handle->messages[packet]->sequence_number =
2534         htonl (socket->write_sequence_number++);
2535       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2536
2537       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2538          determined from RTT */
2539       io_handle->messages[packet]->ack_deadline =
2540         GNUNET_TIME_relative_hton (ack_deadline);
2541       data_msg = io_handle->messages[packet];
2542       /* Copy data from given buffer to the packet */
2543       memcpy (&data_msg[1],
2544               sweep,
2545               payload_size);
2546       sweep += payload_size;
2547       socket->write_offset += payload_size;
2548     }
2549   socket->write_handle = io_handle;
2550   write_data (socket);
2551
2552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2553               "%s() END\n", __func__);
2554
2555   return io_handle;
2556 }
2557
2558
2559 /**
2560  * Tries to read data from the stream
2561  *
2562  * @param socket the socket representing a stream
2563  * @param timeout the timeout period
2564  * @param proc function to call with data (once only)
2565  * @param proc_cls the closure for proc
2566  * @return handle to cancel the operation
2567  */
2568 struct GNUNET_STREAM_IOReadHandle *
2569 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2570                     struct GNUNET_TIME_Relative timeout,
2571                     GNUNET_STREAM_DataProcessor proc,
2572                     void *proc_cls)
2573 {
2574   struct GNUNET_STREAM_IOReadHandle *read_handle;
2575   
2576   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2577               "%x: %s()\n", 
2578               socket->our_id,
2579               __func__);
2580
2581   /* Return NULL if there is already a read handle; the user has to cancel that
2582   first before continuing or has to wait until it is completed */
2583   if (NULL != socket->read_handle) return NULL;
2584
2585   GNUNET_assert (NULL != proc);
2586
2587   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2588   read_handle->proc = proc;
2589   read_handle->proc_cls = proc_cls;
2590   socket->read_handle = read_handle;
2591
2592   /* Check if we have a packet at bitmap 0 */
2593   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2594                                           0))
2595     {
2596       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2597                                                        socket);
2598    
2599     }
2600   
2601   /* Setup the read timeout task */
2602   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2603                                                                &read_io_timeout,
2604                                                                socket);
2605   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2606               "%x: %s() END\n",
2607               socket->our_id,
2608               __func__);
2609   return read_handle;
2610 }
2611
2612
2613 /**
2614  * Cancel pending write operation.
2615  *
2616  * @param ioh handle to operation to cancel
2617  */
2618 void
2619 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2620 {
2621   /* FIXME: Should cancel the write retransmission task */
2622   return;
2623 }
2624
2625
2626 /**
2627  * Cancel pending read operation.
2628  *
2629  * @param ioh handle to operation to cancel
2630  */
2631 void
2632 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2633 {
2634   return;
2635 }