fixed retransmission task
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * Task identifier for the read io timeout task
235    */
236   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
237
238   /**
239    * Task identifier for retransmission task after timeout
240    */
241   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
242
243   /**
244    * The task for sending timely Acks
245    */
246   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
247
248   /**
249    * Task scheduled to continue a read operation.
250    */
251   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
252
253   /**
254    * The state of the protocol associated with this socket
255    */
256   enum State state;
257
258   /**
259    * The status of the socket
260    */
261   enum GNUNET_STREAM_Status status;
262
263   /**
264    * The number of previous timeouts; FIXME: currently not used
265    */
266   unsigned int retries;
267
268   /**
269    * Is this socket derived from listen socket?
270    */
271   unsigned int derived;
272   
273   /**
274    * The peer identity of the peer at the other end of the stream
275    */
276   GNUNET_PEER_Id other_peer;
277
278   /**
279    * Our Peer Identity (for debugging)
280    */
281   GNUNET_PEER_Id our_id;
282
283   /**
284    * The application port number (type: uint32_t)
285    */
286   GNUNET_MESH_ApplicationType app_port;
287
288   /**
289    * The session id associated with this stream connection
290    * FIXME: Not used currently, may be removed
291    */
292   uint32_t session_id;
293
294   /**
295    * Write sequence number. Set to random when sending HELLO(client) and
296    * HELLO_ACK(server) 
297    */
298   uint32_t write_sequence_number;
299
300   /**
301    * Read sequence number. This number's value is determined during handshake
302    */
303   uint32_t read_sequence_number;
304
305   /**
306    * The receiver buffer size
307    */
308   uint32_t receive_buffer_size;
309
310   /**
311    * The receiver buffer boundaries
312    */
313   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
314
315   /**
316    * receiver's available buffer after the last acknowledged packet
317    */
318   uint32_t receiver_window_available;
319
320   /**
321    * The offset pointer used during write operation
322    */
323   uint32_t write_offset;
324
325   /**
326    * The offset after which we are expecting data
327    */
328   uint32_t read_offset;
329
330   /**
331    * The offset upto which user has read from the received buffer
332    */
333   uint32_t copy_offset;
334 };
335
336
337 /**
338  * A socket for listening
339  */
340 struct GNUNET_STREAM_ListenSocket
341 {
342   /**
343    * The mesh handle
344    */
345   struct GNUNET_MESH_Handle *mesh;
346
347   /**
348    * The callback function which is called after successful opening socket
349    */
350   GNUNET_STREAM_ListenCallback listen_cb;
351
352   /**
353    * The call back closure
354    */
355   void *listen_cb_cls;
356
357   /**
358    * Our interned Peer's identity
359    */
360   GNUNET_PEER_Id our_id;
361
362   /**
363    * The service port
364    * FIXME: Remove if not required!
365    */
366   GNUNET_MESH_ApplicationType port;
367 };
368
369
370 /**
371  * The IO Write Handle
372  */
373 struct GNUNET_STREAM_IOWriteHandle
374 {
375   /**
376    * The packet_buffers associated with this Handle
377    */
378   struct GNUNET_STREAM_DataMessage *messages[64];
379
380   /**
381    * The write continuation callback
382    */
383   GNUNET_STREAM_CompletionContinuation write_cont;
384
385   /**
386    * Write continuation closure
387    */
388   void *write_cont_cls;
389
390   /**
391    * The bitmap of this IOHandle; Corresponding bit for a message is set when
392    * it has been acknowledged by the receiver
393    */
394   GNUNET_STREAM_AckBitmap ack_bitmap;
395
396   /**
397    * Number of bytes in this write handle
398    */
399   size_t size;
400
401   /**
402    * Number of packets sent before waiting for an ack
403    *
404    * FIXME: Do we need this?
405    */
406   unsigned int sent_packets;
407 };
408
409
410 /**
411  * The IO Read Handle
412  */
413 struct GNUNET_STREAM_IOReadHandle
414 {
415   /**
416    * Callback for the read processor
417    */
418   GNUNET_STREAM_DataProcessor proc;
419
420   /**
421    * The closure pointer for the read processor callback
422    */
423   void *proc_cls;
424 };
425
426
427 /**
428  * Default value in seconds for various timeouts
429  */
430 static unsigned int default_timeout = 10;
431
432 /**
433  * Callback function for sending hello message
434  *
435  * @param cls closure the socket
436  * @param size number of bytes available in buf
437  * @param buf where the callee should write the message
438  * @return number of bytes written to buf
439  */
440 static size_t
441 send_message_notify (void *cls, size_t size, void *buf)
442 {
443   struct GNUNET_STREAM_Socket *socket = cls;
444   struct GNUNET_PeerIdentity target;
445   struct MessageQueue *head;
446   size_t ret;
447
448   socket->transmit_handle = NULL; /* Remove the transmit handle */
449   head = socket->queue_head;
450   if (NULL == head)
451     return 0; /* just to be safe */
452   GNUNET_PEER_resolve (socket->other_peer, &target);
453   if (0 == size)                /* request timed out */
454     {
455       socket->retries++;
456       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457                   "Message sending timed out. Retry %d \n",
458                   socket->retries);
459       socket->transmit_handle = 
460         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
461                                            0, /* Corking */
462                                            1, /* Priority */
463                                            /* FIXME: exponential backoff */
464                                            socket->retransmit_timeout,
465                                            &target,
466                                            ntohs (head->message->header.size),
467                                            &send_message_notify,
468                                            socket);
469       return 0;
470     }
471
472   ret = ntohs (head->message->header.size);
473   GNUNET_assert (size >= ret);
474   memcpy (buf, head->message, ret);
475   if (NULL != head->finish_cb)
476     {
477       head->finish_cb (head->finish_cb_cls, socket);
478     }
479   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
480                                socket->queue_tail,
481                                head);
482   GNUNET_free (head->message);
483   GNUNET_free (head);
484   head = socket->queue_head;
485   if (NULL != head)    /* more pending messages to send */
486     {
487       socket->retries = 0;
488       socket->transmit_handle = 
489         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
490                                            0, /* Corking */
491                                            1, /* Priority */
492                                            /* FIXME: exponential backoff */
493                                            socket->retransmit_timeout,
494                                            &target,
495                                            ntohs (head->message->header.size),
496                                            &send_message_notify,
497                                            socket);
498     }
499   return ret;
500 }
501
502
503 /**
504  * Queues a message for sending using the mesh connection of a socket
505  *
506  * @param socket the socket whose mesh connection is used
507  * @param message the message to be sent
508  * @param finish_cb the callback to be called when the message is sent
509  * @param finish_cb_cls the closure for the callback
510  */
511 static void
512 queue_message (struct GNUNET_STREAM_Socket *socket,
513                struct GNUNET_STREAM_MessageHeader *message,
514                SendFinishCallback finish_cb,
515                void *finish_cb_cls)
516 {
517   struct MessageQueue *queue_entity;
518   struct GNUNET_PeerIdentity target;
519
520   GNUNET_assert 
521     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
522      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
523
524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525               "%x: Queueing message of type %d and size %d\n",
526               socket->our_id,
527               ntohs (message->header.type),
528               ntohs (message->header.size));
529   GNUNET_assert (NULL != message);
530   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
531   queue_entity->message = message;
532   queue_entity->finish_cb = finish_cb;
533   queue_entity->finish_cb_cls = finish_cb_cls;
534   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
535                                     socket->queue_tail,
536                                     queue_entity);
537   if (NULL == socket->transmit_handle)
538   {
539     socket->retries = 0;
540     GNUNET_PEER_resolve (socket->other_peer, &target);
541     socket->transmit_handle = 
542       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
543                                          0, /* Corking */
544                                          1, /* Priority */
545                                          socket->retransmit_timeout,
546                                          &target,
547                                          ntohs (message->header.size),
548                                          &send_message_notify,
549                                          socket);
550   }
551 }
552
553
554 /**
555  * Copies a message and queues it for sending using the mesh connection of
556  * given socket 
557  *
558  * @param socket the socket whose mesh connection is used
559  * @param message the message to be sent
560  * @param finish_cb the callback to be called when the message is sent
561  * @param finish_cb_cls the closure for the callback
562  */
563 static void
564 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
565                         const struct GNUNET_STREAM_MessageHeader *message,
566                         SendFinishCallback finish_cb,
567                         void *finish_cb_cls)
568 {
569   struct GNUNET_STREAM_MessageHeader *msg_copy;
570   uint16_t size;
571   
572   size = ntohs (message->header.size);
573   msg_copy = GNUNET_malloc (size);
574   memcpy (msg_copy, message, size);
575   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
576 }
577
578
579 /**
580  * Callback function for sending ack message
581  *
582  * @param cls closure the ACK message created in ack_task
583  * @param size number of bytes available in buffer
584  * @param buf where the callee should write the message
585  * @return number of bytes written to buf
586  */
587 static size_t
588 send_ack_notify (void *cls, size_t size, void *buf)
589 {
590   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
591
592   if (0 == size)
593     {
594       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595                   "%s called with size 0\n", __func__);
596       return 0;
597     }
598   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
599   
600   size = ntohs (ack_msg->header.header.size);
601   memcpy (buf, ack_msg, size);
602   return size;
603 }
604
605 /**
606  * Writes data using the given socket. The amount of data written is limited by
607  * the receiver_window_size
608  *
609  * @param socket the socket to use
610  */
611 static void 
612 write_data (struct GNUNET_STREAM_Socket *socket);
613
614 /**
615  * Task for retransmitting data messages if they aren't ACK before their ack
616  * deadline 
617  *
618  * @param cls the socket
619  * @param tc the Task context
620  */
621 static void
622 retransmission_timeout_task (void *cls,
623                              const struct GNUNET_SCHEDULER_TaskContext *tc)
624 {
625   struct GNUNET_STREAM_Socket *socket = cls;
626   
627   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
628     return;
629
630   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
631               "%x: Retransmitting DATA...\n", socket->our_id);
632   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
633   write_data (socket);
634 }
635
636
637 /**
638  * Task for sending ACK message
639  *
640  * @param cls the socket
641  * @param tc the Task context
642  */
643 static void
644 ack_task (void *cls,
645           const struct GNUNET_SCHEDULER_TaskContext *tc)
646 {
647   struct GNUNET_STREAM_Socket *socket = cls;
648   struct GNUNET_STREAM_AckMessage *ack_msg;
649   struct GNUNET_PeerIdentity target;
650
651   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
652     {
653       return;
654     }
655
656   socket->ack_task_id = 0;
657
658   /* Create the ACK Message */
659   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
660   ack_msg->header.header.size = htons (sizeof (struct 
661                                                GNUNET_STREAM_AckMessage));
662   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
663   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
664   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
665   ack_msg->receive_window_remaining = 
666     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
667
668   GNUNET_PEER_resolve (socket->other_peer, &target);
669   /* Request MESH for sending ACK */
670   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
671                                      0, /* Corking */
672                                      1, /* Priority */
673                                      socket->retransmit_timeout,
674                                      &target,
675                                      ntohs (ack_msg->header.header.size),
676                                      &send_ack_notify,
677                                      ack_msg);
678
679   
680 }
681
682
683 /**
684  * Function to modify a bit in GNUNET_STREAM_AckBitmap
685  *
686  * @param bitmap the bitmap to modify
687  * @param bit the bit number to modify
688  * @param value GNUNET_YES to on, GNUNET_NO to off
689  */
690 static void
691 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
692                       unsigned int bit, 
693                       int value)
694 {
695   GNUNET_assert (bit < 64);
696   if (GNUNET_YES == value)
697     *bitmap |= (1LL << bit);
698   else
699     *bitmap &= ~(1LL << bit);
700 }
701
702
703 /**
704  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
705  *
706  * @param bitmap address of the bitmap that has to be checked
707  * @param bit the bit number to check
708  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
709  */
710 static uint8_t
711 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
712                       unsigned int bit)
713 {
714   GNUNET_assert (bit < 64);
715   return 0 != (*bitmap & (1LL << bit));
716 }
717
718
719
720 /**
721  * Function called when Data Message is sent
722  *
723  * @param cls the io_handle corresponding to the Data Message
724  * @param socket the socket which was used
725  */
726 static void
727 write_data_finish_cb (void *cls,
728                       struct GNUNET_STREAM_Socket *socket)
729 {
730   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
731
732   io_handle->sent_packets++;
733 }
734
735
736 /**
737  * Writes data using the given socket. The amount of data written is limited by
738  * the receiver_window_size
739  *
740  * @param socket the socket to use
741  */
742 static void 
743 write_data (struct GNUNET_STREAM_Socket *socket)
744 {
745   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
746   int packet;                   /* Although an int, should never be negative */
747   int ack_packet;
748
749   ack_packet = -1;
750   /* Find the last acknowledged packet */
751   for (packet=0; packet < 64; packet++)
752     {      
753       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
754                                               packet))
755         ack_packet = packet;        
756       else if (NULL == io_handle->messages[packet])
757         break;
758     }
759   /* Resend packets which weren't ack'ed */
760   for (packet=0; packet < ack_packet; packet++)
761     {
762       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
763                                              packet))
764         {
765           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766                       "%x: Placing DATA message with sequence %u in send queue\n",
767                       socket->our_id,
768                       ntohl (io_handle->messages[packet]->sequence_number));
769
770           copy_and_queue_message (socket,
771                                   &io_handle->messages[packet]->header,
772                                   NULL,
773                                   NULL);
774         }
775     }
776   packet = ack_packet + 1;
777   /* Now send new packets if there is enough buffer space */
778   while ( (NULL != io_handle->messages[packet]) &&
779           (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
780     {
781       socket->receiver_window_available -= 
782         ntohs (io_handle->messages[packet]->header.header.size);
783       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784                   "%x: Placing DATA message with sequence %u in send queue\n",
785                   socket->our_id,
786                   ntohl (io_handle->messages[packet]->sequence_number));
787       copy_and_queue_message (socket,
788                               &io_handle->messages[packet]->header,
789                               &write_data_finish_cb,
790                               io_handle);
791       packet++;
792     }
793
794   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
795     socket->retransmission_timeout_task_id = 
796       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
797                                     (GNUNET_TIME_UNIT_SECONDS, 5),
798                                     &retransmission_timeout_task,
799                                     socket);
800 }
801
802
803 /**
804  * Task for calling the read processor
805  *
806  * @param cls the socket
807  * @param tc the task context
808  */
809 static void
810 call_read_processor (void *cls,
811                           const struct GNUNET_SCHEDULER_TaskContext *tc)
812 {
813   struct GNUNET_STREAM_Socket *socket = cls;
814   size_t read_size;
815   size_t valid_read_size;
816   unsigned int packet;
817   uint32_t sequence_increase;
818   uint32_t offset_increase;
819
820   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
821   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
822     return;
823
824   GNUNET_assert (NULL != socket->read_handle);
825   GNUNET_assert (NULL != socket->read_handle->proc);
826
827   /* Check the bitmap for any holes */
828   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
829     {
830       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
831                                              packet))
832         break;
833     }
834   /* We only call read processor if we have the first packet */
835   GNUNET_assert (0 < packet);
836
837   valid_read_size = 
838     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
839
840   GNUNET_assert (0 != valid_read_size);
841
842   /* Cancel the read_io_timeout_task */
843   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
844   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
845
846   /* Call the data processor */
847   read_size = 
848     socket->read_handle->proc (socket->read_handle->proc_cls,
849                                socket->status,
850                                socket->receive_buffer + socket->copy_offset,
851                                valid_read_size);
852   /* Free the read handle */
853   GNUNET_free (socket->read_handle);
854   socket->read_handle = NULL;
855
856   GNUNET_assert (read_size <= valid_read_size);
857   socket->copy_offset += read_size;
858
859   /* Determine upto which packet we can remove from the buffer */
860   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
861     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
862       break;
863
864   /* If no packets can be removed we can't move the buffer */
865   if (0 == packet) return;
866
867   sequence_increase = packet;
868
869   /* Shift the data in the receive buffer */
870   memmove (socket->receive_buffer,
871            socket->receive_buffer 
872            + socket->receive_buffer_boundaries[sequence_increase-1],
873            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
874   
875   /* Shift the bitmap */
876   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
877   
878   /* Set read_sequence_number */
879   socket->read_sequence_number += sequence_increase;
880   
881   /* Set read_offset */
882   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
883   socket->read_offset += offset_increase;
884
885   /* Fix copy_offset */
886   GNUNET_assert (offset_increase <= socket->copy_offset);
887   socket->copy_offset -= offset_increase;
888   
889   /* Fix relative boundaries */
890   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
891     {
892       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
893         {
894           socket->receive_buffer_boundaries[packet] = 
895             socket->receive_buffer_boundaries[packet + sequence_increase] 
896             - offset_increase;
897         }
898       else
899         socket->receive_buffer_boundaries[packet] = 0;
900     }
901 }
902
903
904 /**
905  * Cancels the existing read io handle
906  *
907  * @param cls the closure from the SCHEDULER call
908  * @param tc the task context
909  */
910 static void
911 read_io_timeout (void *cls, 
912                 const struct GNUNET_SCHEDULER_TaskContext *tc)
913 {
914   struct GNUNET_STREAM_Socket *socket = cls;
915
916   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
917   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
918   {
919     GNUNET_SCHEDULER_cancel (socket->read_task_id);
920     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
921   }
922   GNUNET_assert (NULL != socket->read_handle);
923   
924   GNUNET_free (socket->read_handle);
925   socket->read_handle = NULL;
926 }
927
928
929 /**
930  * Handler for DATA messages; Same for both client and server
931  *
932  * @param socket the socket through which the ack was received
933  * @param tunnel connection to the other end
934  * @param sender who sent the message
935  * @param msg the data message
936  * @param atsi performance data for the connection
937  * @return GNUNET_OK to keep the connection open,
938  *         GNUNET_SYSERR to close it (signal serious error)
939  */
940 static int
941 handle_data (struct GNUNET_STREAM_Socket *socket,
942              struct GNUNET_MESH_Tunnel *tunnel,
943              const struct GNUNET_PeerIdentity *sender,
944              const struct GNUNET_STREAM_DataMessage *msg,
945              const struct GNUNET_ATS_Information*atsi)
946 {
947   const void *payload;
948   uint32_t bytes_needed;
949   uint32_t relative_offset;
950   uint32_t relative_sequence_number;
951   uint16_t size;
952
953   size = htons (msg->header.header.size);
954   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
955     {
956       GNUNET_break_op (0);
957       return GNUNET_SYSERR;
958     }
959
960   if (GNUNET_PEER_search (sender) != socket->other_peer)
961     {
962       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963                   "%x: Received DATA from non-confirming peer\n",
964                   socket->our_id);
965       return GNUNET_YES;
966     }
967
968   switch (socket->state)
969     {
970     case STATE_ESTABLISHED:
971     case STATE_TRANSMIT_CLOSED:
972     case STATE_TRANSMIT_CLOSE_WAIT:
973       
974       /* check if the message's sequence number is in the range we are
975          expecting */
976       relative_sequence_number = 
977         ntohl (msg->sequence_number) - socket->read_sequence_number;
978       if ( relative_sequence_number > 64)
979         {
980           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
981                       "%x: Ignoring received message with sequence number %u\n",
982                       socket->our_id,
983                       ntohl (msg->sequence_number));
984           return GNUNET_YES;
985         }
986
987       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
988                   "%x: Receiving DATA with sequence number: %u and size: %d from %x\n",
989                   socket->our_id,
990                   ntohl (msg->sequence_number),
991                   ntohs (msg->header.header.size),
992                   socket->other_peer);
993
994       /* Check if we have to allocate the buffer */
995       size -= sizeof (struct GNUNET_STREAM_DataMessage);
996       relative_offset = ntohl (msg->offset) - socket->read_offset;
997       bytes_needed = relative_offset + size;
998       
999       if (bytes_needed > socket->receive_buffer_size)
1000         {
1001           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1002             {
1003               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1004                                                        bytes_needed);
1005               socket->receive_buffer_size = bytes_needed;
1006             }
1007           else
1008             {
1009               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                           "%x: Cannot accommodate packet %d as buffer is",
1011                           "full\n",
1012                           socket->our_id,
1013                           ntohl (msg->sequence_number));
1014               return GNUNET_YES;
1015             }
1016         }
1017       
1018       /* Copy Data to buffer */
1019       payload = &msg[1];
1020       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1021       memcpy (socket->receive_buffer + relative_offset,
1022               payload,
1023               size);
1024       socket->receive_buffer_boundaries[relative_sequence_number] = 
1025         relative_offset + size;
1026       
1027       /* Modify the ACK bitmap */
1028       ackbitmap_modify_bit (&socket->ack_bitmap,
1029                             relative_sequence_number,
1030                             GNUNET_YES);
1031
1032       /* Start ACK sending task if one is not already present */
1033       if (0 == socket->ack_task_id)
1034        {
1035          socket->ack_task_id = 
1036            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1037                                          (msg->ack_deadline),
1038                                          &ack_task,
1039                                          socket);
1040        }
1041
1042       if ((NULL != socket->read_handle) /* A read handle is waiting */
1043           /* There is no current read task */
1044           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1045           /* We have the first packet */
1046           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1047                                                  0)))
1048         {
1049           socket->read_task_id = 
1050             GNUNET_SCHEDULER_add_now (&call_read_processor,
1051                                       socket);
1052         }
1053       
1054       break;
1055
1056     default:
1057       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058                   "%x: Received data message when it cannot be handled\n",
1059                   socket->our_id);
1060       break;
1061     }
1062   return GNUNET_YES;
1063 }
1064
1065 /**
1066  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1067  *
1068  * @param cls the socket (set from GNUNET_MESH_connect)
1069  * @param tunnel connection to the other end
1070  * @param tunnel_ctx place to store local state associated with the tunnel
1071  * @param sender who sent the message
1072  * @param message the actual message
1073  * @param atsi performance data for the connection
1074  * @return GNUNET_OK to keep the connection open,
1075  *         GNUNET_SYSERR to close it (signal serious error)
1076  */
1077 static int
1078 client_handle_data (void *cls,
1079              struct GNUNET_MESH_Tunnel *tunnel,
1080              void **tunnel_ctx,
1081              const struct GNUNET_PeerIdentity *sender,
1082              const struct GNUNET_MessageHeader *message,
1083              const struct GNUNET_ATS_Information*atsi)
1084 {
1085   struct GNUNET_STREAM_Socket *socket = cls;
1086
1087   return handle_data (socket, 
1088                       tunnel, 
1089                       sender, 
1090                       (const struct GNUNET_STREAM_DataMessage *) message, 
1091                       atsi);
1092 }
1093
1094
1095 /**
1096  * Callback to set state to ESTABLISHED
1097  *
1098  * @param cls the closure from queue_message FIXME: document
1099  * @param socket the socket to requiring state change
1100  */
1101 static void
1102 set_state_established (void *cls,
1103                        struct GNUNET_STREAM_Socket *socket)
1104 {
1105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1106               "%x: Attaining ESTABLISHED state\n",
1107               socket->our_id);
1108   socket->write_offset = 0;
1109   socket->read_offset = 0;
1110   socket->state = STATE_ESTABLISHED;
1111   if (socket->open_cb)
1112     socket->open_cb (socket->open_cls, socket);
1113 }
1114
1115
1116 /**
1117  * Callback to set state to HELLO_WAIT
1118  *
1119  * @param cls the closure from queue_message
1120  * @param socket the socket to requiring state change
1121  */
1122 static void
1123 set_state_hello_wait (void *cls,
1124                       struct GNUNET_STREAM_Socket *socket)
1125 {
1126   GNUNET_assert (STATE_INIT == socket->state);
1127   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1128               "%x: Attaining HELLO_WAIT state\n",
1129               socket->our_id);
1130   socket->state = STATE_HELLO_WAIT;
1131 }
1132
1133
1134 /**
1135  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1136  *
1137  * @param cls the socket (set from GNUNET_MESH_connect)
1138  * @param tunnel connection to the other end
1139  * @param tunnel_ctx this is NULL
1140  * @param sender who sent the message
1141  * @param message the actual message
1142  * @param atsi performance data for the connection
1143  * @return GNUNET_OK to keep the connection open,
1144  *         GNUNET_SYSERR to close it (signal serious error)
1145  */
1146 static int
1147 client_handle_hello_ack (void *cls,
1148                          struct GNUNET_MESH_Tunnel *tunnel,
1149                          void **tunnel_ctx,
1150                          const struct GNUNET_PeerIdentity *sender,
1151                          const struct GNUNET_MessageHeader *message,
1152                          const struct GNUNET_ATS_Information*atsi)
1153 {
1154   struct GNUNET_STREAM_Socket *socket = cls;
1155   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1156   struct GNUNET_STREAM_HelloAckMessage *reply;
1157
1158   if (GNUNET_PEER_search (sender) != socket->other_peer)
1159     {
1160       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161                   "%x: Received HELLO_ACK from non-confirming peer\n",
1162                   socket->our_id);
1163       return GNUNET_YES;
1164     }
1165   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1166   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167               "%x: Received HELLO_ACK from %x\n",
1168               socket->our_id,
1169               socket->other_peer);
1170
1171   GNUNET_assert (socket->tunnel == tunnel);
1172   switch (socket->state)
1173   {
1174   case STATE_HELLO_WAIT:
1175     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1176     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177                 "%x: Read sequence number %u\n",
1178                 socket->our_id,
1179                 (unsigned int) socket->read_sequence_number);
1180     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1181     /* Get the random sequence number */
1182     socket->write_sequence_number = 
1183       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1184       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185                   "%x: Generated write sequence number %u\n",
1186                   socket->our_id,
1187                   (unsigned int) socket->write_sequence_number);
1188     reply = 
1189       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1190     reply->header.header.size = 
1191       htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1192     reply->header.header.type = 
1193       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1194     reply->sequence_number = htonl (socket->write_sequence_number);
1195     reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1196     queue_message (socket, 
1197                    &reply->header, 
1198                    &set_state_established, 
1199                    NULL);      
1200     return GNUNET_OK;
1201   case STATE_ESTABLISHED:
1202   case STATE_RECEIVE_CLOSE_WAIT:
1203     // call statistics (# ACKs ignored++)
1204     return GNUNET_OK;
1205   case STATE_INIT:
1206   default:
1207     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1209                 socket->our_id,
1210                 socket->other_peer,
1211                 socket->state);
1212     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1213     return GNUNET_SYSERR;
1214   }
1215
1216 }
1217
1218
1219 /**
1220  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1221  *
1222  * @param cls the socket (set from GNUNET_MESH_connect)
1223  * @param tunnel connection to the other end
1224  * @param tunnel_ctx this is NULL
1225  * @param sender who sent the message
1226  * @param message the actual message
1227  * @param atsi performance data for the connection
1228  * @return GNUNET_OK to keep the connection open,
1229  *         GNUNET_SYSERR to close it (signal serious error)
1230  */
1231 static int
1232 client_handle_reset (void *cls,
1233                      struct GNUNET_MESH_Tunnel *tunnel,
1234                      void **tunnel_ctx,
1235                      const struct GNUNET_PeerIdentity *sender,
1236                      const struct GNUNET_MessageHeader *message,
1237                      const struct GNUNET_ATS_Information*atsi)
1238 {
1239   struct GNUNET_STREAM_Socket *socket = cls;
1240
1241   return GNUNET_OK;
1242 }
1243
1244
1245 /**
1246  * Common message handler for handling TRANSMIT_CLOSE messages
1247  *
1248  * @param socket the socket through which the ack was received
1249  * @param tunnel connection to the other end
1250  * @param sender who sent the message
1251  * @param msg the transmit close message
1252  * @param atsi performance data for the connection
1253  * @return GNUNET_OK to keep the connection open,
1254  *         GNUNET_SYSERR to close it (signal serious error)
1255  */
1256 static int
1257 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1258                        struct GNUNET_MESH_Tunnel *tunnel,
1259                        const struct GNUNET_PeerIdentity *sender,
1260                        const struct GNUNET_STREAM_MessageHeader *msg,
1261                        const struct GNUNET_ATS_Information*atsi)
1262 {
1263   struct GNUNET_STREAM_MessageHeader *reply;
1264
1265   switch (socket->state)
1266     {
1267     case STATE_ESTABLISHED:
1268       socket->state = STATE_RECEIVE_CLOSED;
1269
1270       /* Send TRANSMIT_CLOSE_ACK */
1271       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1272       reply->header.type = 
1273         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1274       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1275       queue_message (socket, reply, NULL, NULL);
1276       break;
1277
1278     default:
1279       /* FIXME: Call statistics? */
1280       break;
1281     }
1282   return GNUNET_YES;
1283 }
1284
1285
1286 /**
1287  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1288  *
1289  * @param cls the socket (set from GNUNET_MESH_connect)
1290  * @param tunnel connection to the other end
1291  * @param tunnel_ctx this is NULL
1292  * @param sender who sent the message
1293  * @param message the actual message
1294  * @param atsi performance data for the connection
1295  * @return GNUNET_OK to keep the connection open,
1296  *         GNUNET_SYSERR to close it (signal serious error)
1297  */
1298 static int
1299 client_handle_transmit_close (void *cls,
1300                               struct GNUNET_MESH_Tunnel *tunnel,
1301                               void **tunnel_ctx,
1302                               const struct GNUNET_PeerIdentity *sender,
1303                               const struct GNUNET_MessageHeader *message,
1304                               const struct GNUNET_ATS_Information*atsi)
1305 {
1306   struct GNUNET_STREAM_Socket *socket = cls;
1307   
1308   return handle_transmit_close (socket,
1309                                 tunnel,
1310                                 sender,
1311                                 (struct GNUNET_STREAM_MessageHeader *)message,
1312                                 atsi);
1313 }
1314
1315
1316 /**
1317  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1318  *
1319  * @param cls the socket (set from GNUNET_MESH_connect)
1320  * @param tunnel connection to the other end
1321  * @param tunnel_ctx this is NULL
1322  * @param sender who sent the message
1323  * @param message the actual message
1324  * @param atsi performance data for the connection
1325  * @return GNUNET_OK to keep the connection open,
1326  *         GNUNET_SYSERR to close it (signal serious error)
1327  */
1328 static int
1329 client_handle_transmit_close_ack (void *cls,
1330                                   struct GNUNET_MESH_Tunnel *tunnel,
1331                                   void **tunnel_ctx,
1332                                   const struct GNUNET_PeerIdentity *sender,
1333                                   const struct GNUNET_MessageHeader *message,
1334                                   const struct GNUNET_ATS_Information*atsi)
1335 {
1336   struct GNUNET_STREAM_Socket *socket = cls;
1337
1338   return GNUNET_OK;
1339 }
1340
1341
1342 /**
1343  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1344  *
1345  * @param cls the socket (set from GNUNET_MESH_connect)
1346  * @param tunnel connection to the other end
1347  * @param tunnel_ctx this is NULL
1348  * @param sender who sent the message
1349  * @param message the actual message
1350  * @param atsi performance data for the connection
1351  * @return GNUNET_OK to keep the connection open,
1352  *         GNUNET_SYSERR to close it (signal serious error)
1353  */
1354 static int
1355 client_handle_receive_close (void *cls,
1356                              struct GNUNET_MESH_Tunnel *tunnel,
1357                              void **tunnel_ctx,
1358                              const struct GNUNET_PeerIdentity *sender,
1359                              const struct GNUNET_MessageHeader *message,
1360                              const struct GNUNET_ATS_Information*atsi)
1361 {
1362   struct GNUNET_STREAM_Socket *socket = cls;
1363
1364   return GNUNET_OK;
1365 }
1366
1367
1368 /**
1369  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1370  *
1371  * @param cls the socket (set from GNUNET_MESH_connect)
1372  * @param tunnel connection to the other end
1373  * @param tunnel_ctx this is NULL
1374  * @param sender who sent the message
1375  * @param message the actual message
1376  * @param atsi performance data for the connection
1377  * @return GNUNET_OK to keep the connection open,
1378  *         GNUNET_SYSERR to close it (signal serious error)
1379  */
1380 static int
1381 client_handle_receive_close_ack (void *cls,
1382                                  struct GNUNET_MESH_Tunnel *tunnel,
1383                                  void **tunnel_ctx,
1384                                  const struct GNUNET_PeerIdentity *sender,
1385                                  const struct GNUNET_MessageHeader *message,
1386                                  const struct GNUNET_ATS_Information*atsi)
1387 {
1388   struct GNUNET_STREAM_Socket *socket = cls;
1389
1390   return GNUNET_OK;
1391 }
1392
1393
1394 /**
1395  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1396  *
1397  * @param cls the socket (set from GNUNET_MESH_connect)
1398  * @param tunnel connection to the other end
1399  * @param tunnel_ctx this is NULL
1400  * @param sender who sent the message
1401  * @param message the actual message
1402  * @param atsi performance data for the connection
1403  * @return GNUNET_OK to keep the connection open,
1404  *         GNUNET_SYSERR to close it (signal serious error)
1405  */
1406 static int
1407 client_handle_close (void *cls,
1408                      struct GNUNET_MESH_Tunnel *tunnel,
1409                      void **tunnel_ctx,
1410                      const struct GNUNET_PeerIdentity *sender,
1411                      const struct GNUNET_MessageHeader *message,
1412                      const struct GNUNET_ATS_Information*atsi)
1413 {
1414   struct GNUNET_STREAM_Socket *socket = cls;
1415
1416   return GNUNET_OK;
1417 }
1418
1419
1420 /**
1421  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1422  *
1423  * @param cls the socket (set from GNUNET_MESH_connect)
1424  * @param tunnel connection to the other end
1425  * @param tunnel_ctx this is NULL
1426  * @param sender who sent the message
1427  * @param message the actual message
1428  * @param atsi performance data for the connection
1429  * @return GNUNET_OK to keep the connection open,
1430  *         GNUNET_SYSERR to close it (signal serious error)
1431  */
1432 static int
1433 client_handle_close_ack (void *cls,
1434                          struct GNUNET_MESH_Tunnel *tunnel,
1435                          void **tunnel_ctx,
1436                          const struct GNUNET_PeerIdentity *sender,
1437                          const struct GNUNET_MessageHeader *message,
1438                          const struct GNUNET_ATS_Information*atsi)
1439 {
1440   struct GNUNET_STREAM_Socket *socket = cls;
1441
1442   return GNUNET_OK;
1443 }
1444
1445 /*****************************/
1446 /* Server's Message Handlers */
1447 /*****************************/
1448
1449 /**
1450  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1451  *
1452  * @param cls the closure
1453  * @param tunnel connection to the other end
1454  * @param tunnel_ctx the socket
1455  * @param sender who sent the message
1456  * @param message the actual message
1457  * @param atsi performance data for the connection
1458  * @return GNUNET_OK to keep the connection open,
1459  *         GNUNET_SYSERR to close it (signal serious error)
1460  */
1461 static int
1462 server_handle_data (void *cls,
1463                     struct GNUNET_MESH_Tunnel *tunnel,
1464                     void **tunnel_ctx,
1465                     const struct GNUNET_PeerIdentity *sender,
1466                     const struct GNUNET_MessageHeader *message,
1467                     const struct GNUNET_ATS_Information*atsi)
1468 {
1469   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1470
1471   return handle_data (socket,
1472                       tunnel,
1473                       sender,
1474                       (const struct GNUNET_STREAM_DataMessage *)message,
1475                       atsi);
1476 }
1477
1478
1479 /**
1480  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1481  *
1482  * @param cls the closure
1483  * @param tunnel connection to the other end
1484  * @param tunnel_ctx the socket
1485  * @param sender who sent the message
1486  * @param message the actual message
1487  * @param atsi performance data for the connection
1488  * @return GNUNET_OK to keep the connection open,
1489  *         GNUNET_SYSERR to close it (signal serious error)
1490  */
1491 static int
1492 server_handle_hello (void *cls,
1493                      struct GNUNET_MESH_Tunnel *tunnel,
1494                      void **tunnel_ctx,
1495                      const struct GNUNET_PeerIdentity *sender,
1496                      const struct GNUNET_MessageHeader *message,
1497                      const struct GNUNET_ATS_Information*atsi)
1498 {
1499   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1500   struct GNUNET_STREAM_HelloAckMessage *reply;
1501
1502   if (GNUNET_PEER_search (sender) != socket->other_peer)
1503     {
1504       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505                   "%x: Received HELLO from non-confirming peer\n",
1506                   socket->our_id);
1507       return GNUNET_YES;
1508     }
1509
1510   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1511                  ntohs (message->type));
1512   GNUNET_assert (socket->tunnel == tunnel);
1513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1514               "%x: Received HELLO from %x\n", 
1515               socket->our_id,
1516               socket->other_peer);
1517
1518   if (STATE_INIT == socket->state)
1519     {
1520       /* Get the random sequence number */
1521       socket->write_sequence_number = 
1522         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1523       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524                   "%x: Generated write sequence number %u\n",
1525                   socket->our_id,
1526                   (unsigned int) socket->write_sequence_number);
1527       reply = 
1528         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1529       reply->header.header.size = 
1530         htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1531       reply->header.header.type = 
1532         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1533       reply->sequence_number = htonl (socket->write_sequence_number);
1534       reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1535       queue_message (socket, 
1536                      &reply->header,
1537                      &set_state_hello_wait, 
1538                      NULL);
1539     }
1540   else
1541     {
1542       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1543                   "Client sent HELLO when in state %d\n", socket->state);
1544       /* FIXME: Send RESET? */
1545       
1546     }
1547   return GNUNET_OK;
1548 }
1549
1550
1551 /**
1552  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1553  *
1554  * @param cls the closure
1555  * @param tunnel connection to the other end
1556  * @param tunnel_ctx the socket
1557  * @param sender who sent the message
1558  * @param message the actual message
1559  * @param atsi performance data for the connection
1560  * @return GNUNET_OK to keep the connection open,
1561  *         GNUNET_SYSERR to close it (signal serious error)
1562  */
1563 static int
1564 server_handle_hello_ack (void *cls,
1565                          struct GNUNET_MESH_Tunnel *tunnel,
1566                          void **tunnel_ctx,
1567                          const struct GNUNET_PeerIdentity *sender,
1568                          const struct GNUNET_MessageHeader *message,
1569                          const struct GNUNET_ATS_Information*atsi)
1570 {
1571   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1572   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1573
1574   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1575                  ntohs (message->type));
1576   GNUNET_assert (socket->tunnel == tunnel);
1577   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1578   if (STATE_HELLO_WAIT == socket->state)
1579     {
1580       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1581                   "%x: Received HELLO_ACK from %x\n",
1582                   socket->our_id,
1583                   socket->other_peer);
1584       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1586                   "%x: Read sequence number %u\n",
1587                   socket->our_id,
1588                   (unsigned int) socket->read_sequence_number);
1589       socket->receiver_window_available = 
1590         ntohl (ack_message->receiver_window_size);
1591       /* Attain ESTABLISHED state */
1592       set_state_established (NULL, socket);
1593     }
1594   else
1595     {
1596       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1597                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1598       /* FIXME: Send RESET? */
1599       
1600     }
1601   return GNUNET_OK;
1602 }
1603
1604
1605 /**
1606  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1607  *
1608  * @param cls the closure
1609  * @param tunnel connection to the other end
1610  * @param tunnel_ctx the socket
1611  * @param sender who sent the message
1612  * @param message the actual message
1613  * @param atsi performance data for the connection
1614  * @return GNUNET_OK to keep the connection open,
1615  *         GNUNET_SYSERR to close it (signal serious error)
1616  */
1617 static int
1618 server_handle_reset (void *cls,
1619                      struct GNUNET_MESH_Tunnel *tunnel,
1620                      void **tunnel_ctx,
1621                      const struct GNUNET_PeerIdentity *sender,
1622                      const struct GNUNET_MessageHeader *message,
1623                      const struct GNUNET_ATS_Information*atsi)
1624 {
1625   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1626
1627   return GNUNET_OK;
1628 }
1629
1630
1631 /**
1632  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1633  *
1634  * @param cls the closure
1635  * @param tunnel connection to the other end
1636  * @param tunnel_ctx the socket
1637  * @param sender who sent the message
1638  * @param message the actual message
1639  * @param atsi performance data for the connection
1640  * @return GNUNET_OK to keep the connection open,
1641  *         GNUNET_SYSERR to close it (signal serious error)
1642  */
1643 static int
1644 server_handle_transmit_close (void *cls,
1645                               struct GNUNET_MESH_Tunnel *tunnel,
1646                               void **tunnel_ctx,
1647                               const struct GNUNET_PeerIdentity *sender,
1648                               const struct GNUNET_MessageHeader *message,
1649                               const struct GNUNET_ATS_Information*atsi)
1650 {
1651   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1652
1653   return handle_transmit_close (socket,
1654                                 tunnel,
1655                                 sender,
1656                                 (struct GNUNET_STREAM_MessageHeader *)message,
1657                                 atsi);
1658 }
1659
1660
1661 /**
1662  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1663  *
1664  * @param cls the closure
1665  * @param tunnel connection to the other end
1666  * @param tunnel_ctx the socket
1667  * @param sender who sent the message
1668  * @param message the actual message
1669  * @param atsi performance data for the connection
1670  * @return GNUNET_OK to keep the connection open,
1671  *         GNUNET_SYSERR to close it (signal serious error)
1672  */
1673 static int
1674 server_handle_transmit_close_ack (void *cls,
1675                                   struct GNUNET_MESH_Tunnel *tunnel,
1676                                   void **tunnel_ctx,
1677                                   const struct GNUNET_PeerIdentity *sender,
1678                                   const struct GNUNET_MessageHeader *message,
1679                                   const struct GNUNET_ATS_Information*atsi)
1680 {
1681   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1682
1683   return GNUNET_OK;
1684 }
1685
1686
1687 /**
1688  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1689  *
1690  * @param cls the closure
1691  * @param tunnel connection to the other end
1692  * @param tunnel_ctx the socket
1693  * @param sender who sent the message
1694  * @param message the actual message
1695  * @param atsi performance data for the connection
1696  * @return GNUNET_OK to keep the connection open,
1697  *         GNUNET_SYSERR to close it (signal serious error)
1698  */
1699 static int
1700 server_handle_receive_close (void *cls,
1701                              struct GNUNET_MESH_Tunnel *tunnel,
1702                              void **tunnel_ctx,
1703                              const struct GNUNET_PeerIdentity *sender,
1704                              const struct GNUNET_MessageHeader *message,
1705                              const struct GNUNET_ATS_Information*atsi)
1706 {
1707   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1708
1709   return GNUNET_OK;
1710 }
1711
1712
1713 /**
1714  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1715  *
1716  * @param cls the closure
1717  * @param tunnel connection to the other end
1718  * @param tunnel_ctx the socket
1719  * @param sender who sent the message
1720  * @param message the actual message
1721  * @param atsi performance data for the connection
1722  * @return GNUNET_OK to keep the connection open,
1723  *         GNUNET_SYSERR to close it (signal serious error)
1724  */
1725 static int
1726 server_handle_receive_close_ack (void *cls,
1727                                  struct GNUNET_MESH_Tunnel *tunnel,
1728                                  void **tunnel_ctx,
1729                                  const struct GNUNET_PeerIdentity *sender,
1730                                  const struct GNUNET_MessageHeader *message,
1731                                  const struct GNUNET_ATS_Information*atsi)
1732 {
1733   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1734
1735   return GNUNET_OK;
1736 }
1737
1738
1739 /**
1740  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1741  *
1742  * @param cls the closure
1743  * @param tunnel connection to the other end
1744  * @param tunnel_ctx the socket
1745  * @param sender who sent the message
1746  * @param message the actual message
1747  * @param atsi performance data for the connection
1748  * @return GNUNET_OK to keep the connection open,
1749  *         GNUNET_SYSERR to close it (signal serious error)
1750  */
1751 static int
1752 server_handle_close (void *cls,
1753                      struct GNUNET_MESH_Tunnel *tunnel,
1754                      void **tunnel_ctx,
1755                      const struct GNUNET_PeerIdentity *sender,
1756                      const struct GNUNET_MessageHeader *message,
1757                      const struct GNUNET_ATS_Information*atsi)
1758 {
1759   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1760
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1767  *
1768  * @param cls the closure
1769  * @param tunnel connection to the other end
1770  * @param tunnel_ctx the socket
1771  * @param sender who sent the message
1772  * @param message the actual message
1773  * @param atsi performance data for the connection
1774  * @return GNUNET_OK to keep the connection open,
1775  *         GNUNET_SYSERR to close it (signal serious error)
1776  */
1777 static int
1778 server_handle_close_ack (void *cls,
1779                          struct GNUNET_MESH_Tunnel *tunnel,
1780                          void **tunnel_ctx,
1781                          const struct GNUNET_PeerIdentity *sender,
1782                          const struct GNUNET_MessageHeader *message,
1783                          const struct GNUNET_ATS_Information*atsi)
1784 {
1785   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1786
1787   return GNUNET_OK;
1788 }
1789
1790
1791 /**
1792  * Message Handler for mesh
1793  *
1794  * @param socket the socket through which the ack was received
1795  * @param tunnel connection to the other end
1796  * @param sender who sent the message
1797  * @param ack the acknowledgment message
1798  * @param atsi performance data for the connection
1799  * @return GNUNET_OK to keep the connection open,
1800  *         GNUNET_SYSERR to close it (signal serious error)
1801  */
1802 static int
1803 handle_ack (struct GNUNET_STREAM_Socket *socket,
1804             struct GNUNET_MESH_Tunnel *tunnel,
1805             const struct GNUNET_PeerIdentity *sender,
1806             const struct GNUNET_STREAM_AckMessage *ack,
1807             const struct GNUNET_ATS_Information*atsi)
1808 {
1809   unsigned int packet;
1810   int need_retransmission;
1811
1812   if (GNUNET_PEER_search (sender) != socket->other_peer)
1813     {
1814       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1815                   "%x: Received ACK from non-confirming peer\n",
1816                   socket->our_id);
1817       return GNUNET_YES;
1818     }
1819
1820   switch (socket->state)
1821     {
1822     case (STATE_ESTABLISHED):
1823       if (NULL == socket->write_handle)
1824         {
1825           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1826                       "%x: Received DATA_ACK when write_handle is NULL\n",
1827                       socket->our_id);
1828           return GNUNET_OK;
1829         }
1830       
1831       if (!((socket->write_sequence_number 
1832              - htonl (ack->base_sequence_number)) < 64))
1833         {
1834           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1835                       "%x: Received DATA_ACK with unexpected base sequence",
1836                       "number\n",
1837                       socket->our_id);
1838           return GNUNET_OK;
1839         }
1840       /* FIXME: include the case when write_handle is cancelled - ignore the 
1841          acks */
1842
1843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1844                   "%x: Received DATA_ACK from %x\n",
1845                   socket->our_id,
1846                   socket->other_peer);
1847       
1848       /* Cancel the retransmission task */
1849       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1850         {
1851           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1852           socket->retransmission_timeout_task_id = 
1853             GNUNET_SCHEDULER_NO_TASK;
1854         }
1855
1856       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1857       socket->receiver_window_available = 
1858         ntohl (ack->receive_window_remaining);
1859
1860       /* Check if we have received all acknowledgements */
1861       need_retransmission = GNUNET_NO;
1862       for (packet=0; packet < 64; packet++)
1863         {
1864           if (NULL == socket->write_handle->messages[packet]) break;
1865           if (GNUNET_YES != ackbitmap_is_bit_set 
1866               (&socket->write_handle->ack_bitmap,packet))
1867             {
1868               need_retransmission = GNUNET_YES;
1869               break;
1870             }
1871         }
1872       if (GNUNET_YES == need_retransmission)
1873         {
1874           write_data (socket);
1875         }
1876       else      /* We have to call the write continuation callback now */
1877         {
1878           /* Free the packets */
1879           for (packet=0; packet < 64; packet++)
1880             {
1881               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1882             }
1883           if (NULL != socket->write_handle->write_cont)
1884             socket->write_handle->write_cont
1885               (socket->write_handle->write_cont_cls,
1886                socket->status,
1887                socket->write_handle->size);
1888           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1889                       "%x: Write completion callback completed\n",
1890                       socket->our_id);
1891           /* We are done with the write handle - Freeing it */
1892           GNUNET_free (socket->write_handle);
1893           socket->write_handle = NULL;
1894         }
1895       break;
1896     default:
1897       break;
1898     }
1899   return GNUNET_OK;
1900 }
1901
1902
1903 /**
1904  * Message Handler for mesh
1905  *
1906  * @param cls the 'struct GNUNET_STREAM_Socket'
1907  * @param tunnel connection to the other end
1908  * @param tunnel_ctx unused
1909  * @param sender who sent the message
1910  * @param message the actual message
1911  * @param atsi performance data for the connection
1912  * @return GNUNET_OK to keep the connection open,
1913  *         GNUNET_SYSERR to close it (signal serious error)
1914  */
1915 static int
1916 client_handle_ack (void *cls,
1917                    struct GNUNET_MESH_Tunnel *tunnel,
1918                    void **tunnel_ctx,
1919                    const struct GNUNET_PeerIdentity *sender,
1920                    const struct GNUNET_MessageHeader *message,
1921                    const struct GNUNET_ATS_Information*atsi)
1922 {
1923   struct GNUNET_STREAM_Socket *socket = cls;
1924   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1925  
1926   return handle_ack (socket, tunnel, sender, ack, atsi);
1927 }
1928
1929
1930 /**
1931  * Message Handler for mesh
1932  *
1933  * @param cls the server's listen socket
1934  * @param tunnel connection to the other end
1935  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1936  * @param sender who sent the message
1937  * @param message the actual message
1938  * @param atsi performance data for the connection
1939  * @return GNUNET_OK to keep the connection open,
1940  *         GNUNET_SYSERR to close it (signal serious error)
1941  */
1942 static int
1943 server_handle_ack (void *cls,
1944                    struct GNUNET_MESH_Tunnel *tunnel,
1945                    void **tunnel_ctx,
1946                    const struct GNUNET_PeerIdentity *sender,
1947                    const struct GNUNET_MessageHeader *message,
1948                    const struct GNUNET_ATS_Information*atsi)
1949 {
1950   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1951   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1952  
1953   return handle_ack (socket, tunnel, sender, ack, atsi);
1954 }
1955
1956
1957 /**
1958  * For client message handlers, the stream socket is in the
1959  * closure argument.
1960  */
1961 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1962   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1963   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1964    sizeof (struct GNUNET_STREAM_AckMessage) },
1965   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1966    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1967   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1968    sizeof (struct GNUNET_STREAM_MessageHeader)},
1969   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1970    sizeof (struct GNUNET_STREAM_MessageHeader)},
1971   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1972    sizeof (struct GNUNET_STREAM_MessageHeader)},
1973   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1974    sizeof (struct GNUNET_STREAM_MessageHeader)},
1975   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1976    sizeof (struct GNUNET_STREAM_MessageHeader)},
1977   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1978    sizeof (struct GNUNET_STREAM_MessageHeader)},
1979   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1980    sizeof (struct GNUNET_STREAM_MessageHeader)},
1981   {NULL, 0, 0}
1982 };
1983
1984
1985 /**
1986  * For server message handlers, the stream socket is in the
1987  * tunnel context, and the listen socket in the closure argument.
1988  */
1989 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1990   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1991   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1992    sizeof (struct GNUNET_STREAM_AckMessage) },
1993   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1994    sizeof (struct GNUNET_STREAM_MessageHeader)},
1995   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1996    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1997   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1998    sizeof (struct GNUNET_STREAM_MessageHeader)},
1999   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2000    sizeof (struct GNUNET_STREAM_MessageHeader)},
2001   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2002    sizeof (struct GNUNET_STREAM_MessageHeader)},
2003   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2004    sizeof (struct GNUNET_STREAM_MessageHeader)},
2005   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2006    sizeof (struct GNUNET_STREAM_MessageHeader)},
2007   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2008    sizeof (struct GNUNET_STREAM_MessageHeader)},
2009   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2010    sizeof (struct GNUNET_STREAM_MessageHeader)},
2011   {NULL, 0, 0}
2012 };
2013
2014
2015 /**
2016  * Function called when our target peer is connected to our tunnel
2017  *
2018  * @param cls the socket for which this tunnel is created
2019  * @param peer the peer identity of the target
2020  * @param atsi performance data for the connection
2021  */
2022 static void
2023 mesh_peer_connect_callback (void *cls,
2024                             const struct GNUNET_PeerIdentity *peer,
2025                             const struct GNUNET_ATS_Information * atsi)
2026 {
2027   struct GNUNET_STREAM_Socket *socket = cls;
2028   struct GNUNET_STREAM_MessageHeader *message;
2029   GNUNET_PEER_Id connected_peer;
2030
2031   connected_peer = GNUNET_PEER_search (peer);
2032   
2033   if (connected_peer != socket->other_peer)
2034     {
2035       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2036                   "%x: A peer which is not our target has connected",
2037                   "to our tunnel\n",
2038                   socket->our_id);
2039       return;
2040     }
2041   
2042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2043               "%x: Target peer %x connected\n", 
2044               socket->our_id,
2045               connected_peer);
2046   
2047   /* Set state to INIT */
2048   socket->state = STATE_INIT;
2049
2050   /* Send HELLO message */
2051   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2052   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2053   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2054   queue_message (socket,
2055                  message,
2056                  &set_state_hello_wait,
2057                  NULL);
2058
2059   /* Call open callback */
2060   if (NULL == socket->open_cb)
2061     {
2062       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2063                   "STREAM_open callback is NULL\n");
2064     }
2065 }
2066
2067
2068 /**
2069  * Function called when our target peer is disconnected from our tunnel
2070  *
2071  * @param cls the socket associated which this tunnel
2072  * @param peer the peer identity of the target
2073  */
2074 static void
2075 mesh_peer_disconnect_callback (void *cls,
2076                                const struct GNUNET_PeerIdentity *peer)
2077 {
2078
2079 }
2080
2081
2082 /**
2083  * Method called whenever a peer creates a tunnel to us
2084  *
2085  * @param cls closure
2086  * @param tunnel new handle to the tunnel
2087  * @param initiator peer that started the tunnel
2088  * @param atsi performance information for the tunnel
2089  * @return initial tunnel context for the tunnel
2090  *         (can be NULL -- that's not an error)
2091  */
2092 static void *
2093 new_tunnel_notify (void *cls,
2094                    struct GNUNET_MESH_Tunnel *tunnel,
2095                    const struct GNUNET_PeerIdentity *initiator,
2096                    const struct GNUNET_ATS_Information *atsi)
2097 {
2098   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2099   struct GNUNET_STREAM_Socket *socket;
2100
2101   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2102      from the same peer again until the socket is closed */
2103
2104   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2105   socket->other_peer = GNUNET_PEER_intern (initiator);
2106   socket->tunnel = tunnel;
2107   socket->session_id = 0;       /* FIXME */
2108   socket->state = STATE_INIT;
2109   socket->derived = GNUNET_YES;
2110   socket->our_id = lsocket->our_id;
2111   
2112   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2113               "%x: Peer %x initiated tunnel to us\n", 
2114               socket->our_id,
2115               socket->other_peer);
2116   
2117   /* FIXME: Copy MESH handle from lsocket to socket */
2118   /* FIXME: What if listen_cb is NULL */
2119   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
2120                                            socket,
2121                                            initiator))
2122     {
2123       socket->state = STATE_CLOSED;
2124       /* FIXME: Send CLOSE message and then free */
2125       GNUNET_free (socket);
2126       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
2127     }
2128   return socket;
2129 }
2130
2131
2132 /**
2133  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2134  * any associated state.  This function is NOT called if the client has
2135  * explicitly asked for the tunnel to be destroyed using
2136  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2137  * the tunnel.
2138  *
2139  * @param cls closure (set from GNUNET_MESH_connect)
2140  * @param tunnel connection to the other end (henceforth invalid)
2141  * @param tunnel_ctx place where local state associated
2142  *                   with the tunnel is stored
2143  */
2144 static void 
2145 tunnel_cleaner (void *cls,
2146                 const struct GNUNET_MESH_Tunnel *tunnel,
2147                 void *tunnel_ctx)
2148 {
2149   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2150   
2151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2152               "%x: Peer %x has terminated connection abruptly\n",
2153               socket->our_id,
2154               socket->other_peer);
2155
2156   socket->status = GNUNET_STREAM_SHUTDOWN;
2157
2158   /* Clear Transmit handles */
2159   if (NULL != socket->transmit_handle)
2160     {
2161       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2162       socket->transmit_handle = NULL;
2163     }
2164   socket->tunnel = NULL;
2165 }
2166
2167
2168 /*****************/
2169 /* API functions */
2170 /*****************/
2171
2172
2173 /**
2174  * Tries to open a stream to the target peer
2175  *
2176  * @param cfg configuration to use
2177  * @param target the target peer to which the stream has to be opened
2178  * @param app_port the application port number which uniquely identifies this
2179  *            stream
2180  * @param open_cb this function will be called after stream has be established 
2181  * @param open_cb_cls the closure for open_cb
2182  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2183  * @return if successful it returns the stream socket; NULL if stream cannot be
2184  *         opened 
2185  */
2186 struct GNUNET_STREAM_Socket *
2187 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2188                     const struct GNUNET_PeerIdentity *target,
2189                     GNUNET_MESH_ApplicationType app_port,
2190                     GNUNET_STREAM_OpenCallback open_cb,
2191                     void *open_cb_cls,
2192                     ...)
2193 {
2194   struct GNUNET_STREAM_Socket *socket;
2195   struct GNUNET_PeerIdentity own_peer_id;
2196   enum GNUNET_STREAM_Option option;
2197   va_list vargs;                /* Variable arguments */
2198
2199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2200               "%s\n", __func__);
2201
2202   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2203   socket->other_peer = GNUNET_PEER_intern (target);
2204   socket->open_cb = open_cb;
2205   socket->open_cls = open_cb_cls;
2206   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2207   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2208   
2209   /* Set defaults */
2210   socket->retransmit_timeout = 
2211     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2212
2213   va_start (vargs, open_cb_cls); /* Parse variable args */
2214   do {
2215     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2216     switch (option)
2217       {
2218       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2219         /* Expect struct GNUNET_TIME_Relative */
2220         socket->retransmit_timeout = va_arg (vargs,
2221                                              struct GNUNET_TIME_Relative);
2222         break;
2223       case GNUNET_STREAM_OPTION_END:
2224         break;
2225       }
2226   } while (GNUNET_STREAM_OPTION_END != option);
2227   va_end (vargs);               /* End of variable args parsing */
2228   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2229                                       10,  /* QUEUE size as parameter? */
2230                                       socket, /* cls */
2231                                       NULL, /* No inbound tunnel handler */
2232                                       &tunnel_cleaner, /* FIXME: not required? */
2233                                       client_message_handlers,
2234                                       &app_port); /* We don't get inbound tunnels */
2235   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2236     {
2237       GNUNET_free (socket);
2238       return NULL;
2239     }
2240
2241   /* Now create the mesh tunnel to target */
2242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2243               "Creating MESH Tunnel\n");
2244   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2245                                               NULL, /* Tunnel context */
2246                                               &mesh_peer_connect_callback,
2247                                               &mesh_peer_disconnect_callback,
2248                                               socket);
2249   GNUNET_assert (NULL != socket->tunnel);
2250   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2251                                         target);
2252   
2253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254               "%s() END\n", __func__);
2255   return socket;
2256 }
2257
2258
2259 /**
2260  * Shutdown the stream for reading or writing (man 2 shutdown).
2261  *
2262  * @param socket the stream socket
2263  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2264  */
2265 void
2266 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2267                         int how)
2268 {
2269   return;
2270 }
2271
2272
2273 /**
2274  * Closes the stream
2275  *
2276  * @param socket the stream socket
2277  */
2278 void
2279 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2280 {
2281   struct MessageQueue *head;
2282
2283   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2284   {
2285     /* socket closed with read task pending!? */
2286     GNUNET_break (0);
2287     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2288     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2289   }
2290
2291   /* Clear Transmit handles */
2292   if (NULL != socket->transmit_handle)
2293     {
2294       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2295       socket->transmit_handle = NULL;
2296     }
2297
2298   /* Clear existing message queue */
2299   while (NULL != (head = socket->queue_head)) {
2300     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2301                                  socket->queue_tail,
2302                                  head);
2303     GNUNET_free (head->message);
2304     GNUNET_free (head);
2305   }
2306
2307   /* Close associated tunnel */
2308   if (NULL != socket->tunnel)
2309     {
2310       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2311       socket->tunnel = NULL;
2312     }
2313
2314   /* Close mesh connection */
2315   if (NULL != socket->mesh && GNUNET_YES != socket->derived)
2316     {
2317       GNUNET_MESH_disconnect (socket->mesh);
2318       socket->mesh = NULL;
2319     }
2320   
2321   /* Release receive buffer */
2322   if (NULL != socket->receive_buffer)
2323     {
2324       GNUNET_free (socket->receive_buffer);
2325     }
2326
2327   GNUNET_free (socket);
2328 }
2329
2330
2331 /**
2332  * Listens for stream connections for a specific application ports
2333  *
2334  * @param cfg the configuration to use
2335  * @param app_port the application port for which new streams will be accepted
2336  * @param listen_cb this function will be called when a peer tries to establish
2337  *            a stream with us
2338  * @param listen_cb_cls closure for listen_cb
2339  * @return listen socket, NULL for any error
2340  */
2341 struct GNUNET_STREAM_ListenSocket *
2342 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2343                       GNUNET_MESH_ApplicationType app_port,
2344                       GNUNET_STREAM_ListenCallback listen_cb,
2345                       void *listen_cb_cls)
2346 {
2347   /* FIXME: Add variable args for passing configration options? */
2348   struct GNUNET_STREAM_ListenSocket *lsocket;
2349   struct GNUNET_PeerIdentity our_peer_id;
2350
2351   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2352   lsocket->port = app_port;
2353   lsocket->listen_cb = listen_cb;
2354   lsocket->listen_cb_cls = listen_cb_cls;
2355   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2356   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2357   lsocket->mesh = GNUNET_MESH_connect (cfg,
2358                                        10, /* FIXME: QUEUE size as parameter? */
2359                                        lsocket, /* Closure */
2360                                        &new_tunnel_notify,
2361                                        &tunnel_cleaner,
2362                                        server_message_handlers,
2363                                        &app_port);
2364   GNUNET_assert (NULL != lsocket->mesh);
2365   return lsocket;
2366 }
2367
2368
2369 /**
2370  * Closes the listen socket
2371  *
2372  * @param lsocket the listen socket
2373  */
2374 void
2375 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2376 {
2377   /* Close MESH connection */
2378   GNUNET_assert (NULL != lsocket->mesh);
2379   GNUNET_MESH_disconnect (lsocket->mesh);
2380   
2381   GNUNET_free (lsocket);
2382 }
2383
2384
2385 /**
2386  * Tries to write the given data to the stream
2387  *
2388  * @param socket the socket representing a stream
2389  * @param data the data buffer from where the data is written into the stream
2390  * @param size the number of bytes to be written from the data buffer
2391  * @param timeout the timeout period
2392  * @param write_cont the function to call upon writing some bytes into the stream
2393  * @param write_cont_cls the closure
2394  * @return handle to cancel the operation
2395  */
2396 struct GNUNET_STREAM_IOWriteHandle *
2397 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2398                      const void *data,
2399                      size_t size,
2400                      struct GNUNET_TIME_Relative timeout,
2401                      GNUNET_STREAM_CompletionContinuation write_cont,
2402                      void *write_cont_cls)
2403 {
2404   unsigned int num_needed_packets;
2405   unsigned int packet;
2406   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2407   uint32_t packet_size;
2408   uint32_t payload_size;
2409   struct GNUNET_STREAM_DataMessage *data_msg;
2410   const void *sweep;
2411   struct GNUNET_TIME_Relative ack_deadline;
2412
2413   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2414               "%s\n", __func__);
2415
2416   /* Return NULL if there is already a write request pending */
2417   if (NULL != socket->write_handle)
2418   {
2419     GNUNET_break (0);
2420     return NULL;
2421   }
2422   if (!((STATE_ESTABLISHED == socket->state)
2423         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2424         || (STATE_RECEIVE_CLOSED == socket->state)))
2425     {
2426       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2427                   "%x: Attempting to write on a closed (OR) not-yet-established"
2428                   "stream\n",
2429                   socket->our_id);
2430       return NULL;
2431     } 
2432   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2433     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2434   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2435   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2436   io_handle->write_cont = write_cont;
2437   io_handle->write_cont_cls = write_cont_cls;
2438   io_handle->size = size;
2439   sweep = data;
2440   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2441      determined from RTT */
2442   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2443   /* Divide the given buffer into packets for sending */
2444   for (packet=0; packet < num_needed_packets; packet++)
2445     {
2446       if ((packet + 1) * max_payload_size < size) 
2447         {
2448           payload_size = max_payload_size;
2449           packet_size = MAX_PACKET_SIZE;
2450         }
2451       else 
2452         {
2453           payload_size = size - packet * max_payload_size;
2454           packet_size =  payload_size + sizeof (struct
2455                                                 GNUNET_STREAM_DataMessage); 
2456         }
2457       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2458       io_handle->messages[packet]->header.header.size = htons (packet_size);
2459       io_handle->messages[packet]->header.header.type =
2460         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2461       io_handle->messages[packet]->sequence_number =
2462         htonl (socket->write_sequence_number++);
2463       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2464
2465       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2466          determined from RTT */
2467       io_handle->messages[packet]->ack_deadline =
2468         GNUNET_TIME_relative_hton (ack_deadline);
2469       data_msg = io_handle->messages[packet];
2470       /* Copy data from given buffer to the packet */
2471       memcpy (&data_msg[1],
2472               sweep,
2473               payload_size);
2474       sweep += payload_size;
2475       socket->write_offset += payload_size;
2476     }
2477   socket->write_handle = io_handle;
2478   write_data (socket);
2479
2480   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2481               "%s() END\n", __func__);
2482
2483   return io_handle;
2484 }
2485
2486
2487 /**
2488  * Tries to read data from the stream
2489  *
2490  * @param socket the socket representing a stream
2491  * @param timeout the timeout period
2492  * @param proc function to call with data (once only)
2493  * @param proc_cls the closure for proc
2494  * @return handle to cancel the operation
2495  */
2496 struct GNUNET_STREAM_IOReadHandle *
2497 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2498                     struct GNUNET_TIME_Relative timeout,
2499                     GNUNET_STREAM_DataProcessor proc,
2500                     void *proc_cls)
2501 {
2502   struct GNUNET_STREAM_IOReadHandle *read_handle;
2503   
2504   /* Return NULL if there is already a read handle; the user has to cancel that
2505   first before continuing or has to wait until it is completed */
2506   if (NULL != socket->read_handle) return NULL;
2507
2508   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2509   read_handle->proc = proc;
2510   socket->read_handle = read_handle;
2511
2512   /* Check if we have a packet at bitmap 0 */
2513   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2514                                           0))
2515     {
2516       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2517                                                        socket);
2518    
2519     }
2520   
2521   /* Setup the read timeout task */
2522   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2523                                                                &read_io_timeout,
2524                                                                socket);
2525   return read_handle;
2526 }
2527
2528
2529 /**
2530  * Cancel pending write operation.
2531  *
2532  * @param ioh handle to operation to cancel
2533  */
2534 void
2535 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2536 {
2537   return;
2538 }
2539
2540
2541 /**
2542  * Cancel pending read operation.
2543  *
2544  * @param ioh handle to operation to cancel
2545  */
2546 void
2547 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2548 {
2549   return;
2550 }