fixed byte conversion bugs
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * Task identifier for the read io timeout task
235    */
236   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
237
238   /**
239    * Task identifier for retransmission task after timeout
240    */
241   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
242
243   /**
244    * The task for sending timely Acks
245    */
246   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
247
248   /**
249    * Task scheduled to continue a read operation.
250    */
251   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
252
253   /**
254    * The state of the protocol associated with this socket
255    */
256   enum State state;
257
258   /**
259    * The status of the socket
260    */
261   enum GNUNET_STREAM_Status status;
262
263   /**
264    * The number of previous timeouts; FIXME: currently not used
265    */
266   unsigned int retries;
267
268   /**
269    * Is this socket derived from listen socket?
270    */
271   unsigned int derived;
272   
273   /**
274    * The peer identity of the peer at the other end of the stream
275    */
276   GNUNET_PEER_Id other_peer;
277
278   /**
279    * Our Peer Identity (for debugging)
280    */
281   GNUNET_PEER_Id our_id;
282
283   /**
284    * The application port number (type: uint32_t)
285    */
286   GNUNET_MESH_ApplicationType app_port;
287
288   /**
289    * The session id associated with this stream connection
290    * FIXME: Not used currently, may be removed
291    */
292   uint32_t session_id;
293
294   /**
295    * Write sequence number. Set to random when sending HELLO(client) and
296    * HELLO_ACK(server) 
297    */
298   uint32_t write_sequence_number;
299
300   /**
301    * Read sequence number. This number's value is determined during handshake
302    */
303   uint32_t read_sequence_number;
304
305   /**
306    * The receiver buffer size
307    */
308   uint32_t receive_buffer_size;
309
310   /**
311    * The receiver buffer boundaries
312    */
313   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
314
315   /**
316    * receiver's available buffer after the last acknowledged packet
317    */
318   uint32_t receiver_window_available;
319
320   /**
321    * The offset pointer used during write operation
322    */
323   uint32_t write_offset;
324
325   /**
326    * The offset after which we are expecting data
327    */
328   uint32_t read_offset;
329
330   /**
331    * The offset upto which user has read from the received buffer
332    */
333   uint32_t copy_offset;
334 };
335
336
337 /**
338  * A socket for listening
339  */
340 struct GNUNET_STREAM_ListenSocket
341 {
342   /**
343    * The mesh handle
344    */
345   struct GNUNET_MESH_Handle *mesh;
346
347   /**
348    * The callback function which is called after successful opening socket
349    */
350   GNUNET_STREAM_ListenCallback listen_cb;
351
352   /**
353    * The call back closure
354    */
355   void *listen_cb_cls;
356
357   /**
358    * Our interned Peer's identity
359    */
360   GNUNET_PEER_Id our_id;
361
362   /**
363    * The service port
364    * FIXME: Remove if not required!
365    */
366   GNUNET_MESH_ApplicationType port;
367 };
368
369
370 /**
371  * The IO Write Handle
372  */
373 struct GNUNET_STREAM_IOWriteHandle
374 {
375   /**
376    * The packet_buffers associated with this Handle
377    */
378   struct GNUNET_STREAM_DataMessage *messages[64];
379
380   /**
381    * The write continuation callback
382    */
383   GNUNET_STREAM_CompletionContinuation write_cont;
384
385   /**
386    * Write continuation closure
387    */
388   void *write_cont_cls;
389
390   /**
391    * The bitmap of this IOHandle; Corresponding bit for a message is set when
392    * it has been acknowledged by the receiver
393    */
394   GNUNET_STREAM_AckBitmap ack_bitmap;
395
396   /**
397    * Number of bytes in this write handle
398    */
399   size_t size;
400
401   /**
402    * Number of packets sent before waiting for an ack
403    *
404    * FIXME: Do we need this?
405    */
406   unsigned int sent_packets;
407 };
408
409
410 /**
411  * The IO Read Handle
412  */
413 struct GNUNET_STREAM_IOReadHandle
414 {
415   /**
416    * Callback for the read processor
417    */
418   GNUNET_STREAM_DataProcessor proc;
419
420   /**
421    * The closure pointer for the read processor callback
422    */
423   void *proc_cls;
424 };
425
426
427 /**
428  * Default value in seconds for various timeouts
429  */
430 static unsigned int default_timeout = 10;
431
432 /**
433  * Callback function for sending hello message
434  *
435  * @param cls closure the socket
436  * @param size number of bytes available in buf
437  * @param buf where the callee should write the message
438  * @return number of bytes written to buf
439  */
440 static size_t
441 send_message_notify (void *cls, size_t size, void *buf)
442 {
443   struct GNUNET_STREAM_Socket *socket = cls;
444   struct GNUNET_PeerIdentity target;
445   struct MessageQueue *head;
446   size_t ret;
447
448   socket->transmit_handle = NULL; /* Remove the transmit handle */
449   head = socket->queue_head;
450   if (NULL == head)
451     return 0; /* just to be safe */
452   GNUNET_PEER_resolve (socket->other_peer, &target);
453   if (0 == size)                /* request timed out */
454     {
455       socket->retries++;
456       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457                   "Message sending timed out. Retry %d \n",
458                   socket->retries);
459       socket->transmit_handle = 
460         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
461                                            0, /* Corking */
462                                            1, /* Priority */
463                                            /* FIXME: exponential backoff */
464                                            socket->retransmit_timeout,
465                                            &target,
466                                            ntohs (head->message->header.size),
467                                            &send_message_notify,
468                                            socket);
469       return 0;
470     }
471
472   ret = ntohs (head->message->header.size);
473   GNUNET_assert (size >= ret);
474   memcpy (buf, head->message, ret);
475   if (NULL != head->finish_cb)
476     {
477       head->finish_cb (head->finish_cb_cls, socket);
478     }
479   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
480                                socket->queue_tail,
481                                head);
482   GNUNET_free (head->message);
483   GNUNET_free (head);
484   head = socket->queue_head;
485   if (NULL != head)    /* more pending messages to send */
486     {
487       socket->retries = 0;
488       socket->transmit_handle = 
489         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
490                                            0, /* Corking */
491                                            1, /* Priority */
492                                            /* FIXME: exponential backoff */
493                                            socket->retransmit_timeout,
494                                            &target,
495                                            ntohs (head->message->header.size),
496                                            &send_message_notify,
497                                            socket);
498     }
499   return ret;
500 }
501
502
503 /**
504  * Queues a message for sending using the mesh connection of a socket
505  *
506  * @param socket the socket whose mesh connection is used
507  * @param message the message to be sent
508  * @param finish_cb the callback to be called when the message is sent
509  * @param finish_cb_cls the closure for the callback
510  */
511 static void
512 queue_message (struct GNUNET_STREAM_Socket *socket,
513                struct GNUNET_STREAM_MessageHeader *message,
514                SendFinishCallback finish_cb,
515                void *finish_cb_cls)
516 {
517   struct MessageQueue *queue_entity;
518   struct GNUNET_PeerIdentity target;
519
520   GNUNET_assert 
521     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
522      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
523
524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525               "%x: Queueing message of type %d and size %d\n",
526               socket->our_id,
527               ntohs (message->header.type),
528               ntohs (message->header.size));
529   GNUNET_assert (NULL != message);
530   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
531   queue_entity->message = message;
532   queue_entity->finish_cb = finish_cb;
533   queue_entity->finish_cb_cls = finish_cb_cls;
534   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
535                                     socket->queue_tail,
536                                     queue_entity);
537   if (NULL == socket->transmit_handle)
538   {
539     socket->retries = 0;
540     GNUNET_PEER_resolve (socket->other_peer, &target);
541     socket->transmit_handle = 
542       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
543                                          0, /* Corking */
544                                          1, /* Priority */
545                                          socket->retransmit_timeout,
546                                          &target,
547                                          ntohs (message->header.size),
548                                          &send_message_notify,
549                                          socket);
550   }
551 }
552
553
554 /**
555  * Copies a message and queues it for sending using the mesh connection of
556  * given socket 
557  *
558  * @param socket the socket whose mesh connection is used
559  * @param message the message to be sent
560  * @param finish_cb the callback to be called when the message is sent
561  * @param finish_cb_cls the closure for the callback
562  */
563 static void
564 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
565                         const struct GNUNET_STREAM_MessageHeader *message,
566                         SendFinishCallback finish_cb,
567                         void *finish_cb_cls)
568 {
569   struct GNUNET_STREAM_MessageHeader *msg_copy;
570   uint16_t size;
571   
572   size = ntohs (message->header.size);
573   msg_copy = GNUNET_malloc (size);
574   memcpy (msg_copy, message, size);
575   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
576 }
577
578
579 /**
580  * Callback function for sending ack message
581  *
582  * @param cls closure the ACK message created in ack_task
583  * @param size number of bytes available in buffer
584  * @param buf where the callee should write the message
585  * @return number of bytes written to buf
586  */
587 static size_t
588 send_ack_notify (void *cls, size_t size, void *buf)
589 {
590   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
591
592   if (0 == size)
593     {
594       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595                   "%s called with size 0\n", __func__);
596       return 0;
597     }
598   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
599   
600   size = ntohs (ack_msg->header.header.size);
601   memcpy (buf, ack_msg, size);
602   return size;
603 }
604
605 /**
606  * Writes data using the given socket. The amount of data written is limited by
607  * the receiver_window_size
608  *
609  * @param socket the socket to use
610  */
611 static void 
612 write_data (struct GNUNET_STREAM_Socket *socket);
613
614 /**
615  * Task for retransmitting data messages if they aren't ACK before their ack
616  * deadline 
617  *
618  * @param cls the socket
619  * @param tc the Task context
620  */
621 static void
622 retransmission_timeout_task (void *cls,
623                              const struct GNUNET_SCHEDULER_TaskContext *tc)
624 {
625   struct GNUNET_STREAM_Socket *socket = cls;
626   
627   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
628     return;
629
630   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
631               "%x: Retransmitting DATA...\n", socket->our_id);
632   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
633   write_data (socket);
634 }
635
636
637 /**
638  * Task for sending ACK message
639  *
640  * @param cls the socket
641  * @param tc the Task context
642  */
643 static void
644 ack_task (void *cls,
645           const struct GNUNET_SCHEDULER_TaskContext *tc)
646 {
647   struct GNUNET_STREAM_Socket *socket = cls;
648   struct GNUNET_STREAM_AckMessage *ack_msg;
649   struct GNUNET_PeerIdentity target;
650
651   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
652     {
653       return;
654     }
655
656   socket->ack_task_id = 0;
657
658   /* Create the ACK Message */
659   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
660   ack_msg->header.header.size = htons (sizeof (struct 
661                                                GNUNET_STREAM_AckMessage));
662   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
663   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
664   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
665   ack_msg->receive_window_remaining = 
666     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
667
668   GNUNET_PEER_resolve (socket->other_peer, &target);
669   /* Request MESH for sending ACK */
670   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
671                                      0, /* Corking */
672                                      1, /* Priority */
673                                      socket->retransmit_timeout,
674                                      &target,
675                                      ntohs (ack_msg->header.header.size),
676                                      &send_ack_notify,
677                                      ack_msg);
678
679   
680 }
681
682
683 /**
684  * Function to modify a bit in GNUNET_STREAM_AckBitmap
685  *
686  * @param bitmap the bitmap to modify
687  * @param bit the bit number to modify
688  * @param value GNUNET_YES to on, GNUNET_NO to off
689  */
690 static void
691 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
692                       unsigned int bit, 
693                       int value)
694 {
695   GNUNET_assert (bit < 64);
696   if (GNUNET_YES == value)
697     *bitmap |= (1LL << bit);
698   else
699     *bitmap &= ~(1LL << bit);
700 }
701
702
703 /**
704  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
705  *
706  * @param bitmap address of the bitmap that has to be checked
707  * @param bit the bit number to check
708  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
709  */
710 static uint8_t
711 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
712                       unsigned int bit)
713 {
714   GNUNET_assert (bit < 64);
715   return 0 != (*bitmap & (1LL << bit));
716 }
717
718
719
720 /**
721  * Function called when Data Message is sent
722  *
723  * @param cls the io_handle corresponding to the Data Message
724  * @param socket the socket which was used
725  */
726 static void
727 write_data_finish_cb (void *cls,
728                       struct GNUNET_STREAM_Socket *socket)
729 {
730   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
731
732   io_handle->sent_packets++;
733 }
734
735
736 /**
737  * Writes data using the given socket. The amount of data written is limited by
738  * the receiver_window_size
739  *
740  * @param socket the socket to use
741  */
742 static void 
743 write_data (struct GNUNET_STREAM_Socket *socket)
744 {
745   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
746   int packet;                   /* Although an int, should never be negative */
747   int ack_packet;
748
749   ack_packet = -1;
750   /* Find the last acknowledged packet */
751   for (packet=0; packet < 64; packet++)
752     {      
753       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
754                                               packet))
755         ack_packet = packet;        
756       else if (NULL == io_handle->messages[packet])
757         break;
758     }
759   /* Resend packets which weren't ack'ed */
760   for (packet=0; packet < ack_packet; packet++)
761     {
762       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
763                                              packet))
764         {
765           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766                       "%x: Placing DATA message with sequence %u in send queue\n",
767                       socket->our_id,
768                       ntohl (io_handle->messages[packet]->sequence_number));
769
770           copy_and_queue_message (socket,
771                                   &io_handle->messages[packet]->header,
772                                   NULL,
773                                   NULL);
774         }
775     }
776   packet = ack_packet + 1;
777   /* Now send new packets if there is enough buffer space */
778   while ( (NULL != io_handle->messages[packet]) &&
779           (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
780     {
781       socket->receiver_window_available -= 
782         ntohs (io_handle->messages[packet]->header.header.size);
783       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784                   "%x: Placing DATA message with sequence %u in send queue\n",
785                   socket->our_id,
786                   ntohl (io_handle->messages[packet]->sequence_number));
787       copy_and_queue_message (socket,
788                               &io_handle->messages[packet]->header,
789                               &write_data_finish_cb,
790                               io_handle);
791       packet++;
792     }
793
794   GNUNET_SCHEDULER_add_delayed
795     (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
796      &retransmission_timeout_task,
797      socket);                                
798 }
799
800
801 /**
802  * Task for calling the read processor
803  *
804  * @param cls the socket
805  * @param tc the task context
806  */
807 static void
808 call_read_processor (void *cls,
809                           const struct GNUNET_SCHEDULER_TaskContext *tc)
810 {
811   struct GNUNET_STREAM_Socket *socket = cls;
812   size_t read_size;
813   size_t valid_read_size;
814   unsigned int packet;
815   uint32_t sequence_increase;
816   uint32_t offset_increase;
817
818   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
819   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
820     return;
821
822   GNUNET_assert (NULL != socket->read_handle);
823   GNUNET_assert (NULL != socket->read_handle->proc);
824
825   /* Check the bitmap for any holes */
826   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
827     {
828       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
829                                              packet))
830         break;
831     }
832   /* We only call read processor if we have the first packet */
833   GNUNET_assert (0 < packet);
834
835   valid_read_size = 
836     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
837
838   GNUNET_assert (0 != valid_read_size);
839
840   /* Cancel the read_io_timeout_task */
841   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
842   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
843
844   /* Call the data processor */
845   read_size = 
846     socket->read_handle->proc (socket->read_handle->proc_cls,
847                                socket->status,
848                                socket->receive_buffer + socket->copy_offset,
849                                valid_read_size);
850   /* Free the read handle */
851   GNUNET_free (socket->read_handle);
852   socket->read_handle = NULL;
853
854   GNUNET_assert (read_size <= valid_read_size);
855   socket->copy_offset += read_size;
856
857   /* Determine upto which packet we can remove from the buffer */
858   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
859     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
860       break;
861
862   /* If no packets can be removed we can't move the buffer */
863   if (0 == packet) return;
864
865   sequence_increase = packet;
866
867   /* Shift the data in the receive buffer */
868   memmove (socket->receive_buffer,
869            socket->receive_buffer 
870            + socket->receive_buffer_boundaries[sequence_increase-1],
871            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
872   
873   /* Shift the bitmap */
874   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
875   
876   /* Set read_sequence_number */
877   socket->read_sequence_number += sequence_increase;
878   
879   /* Set read_offset */
880   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
881   socket->read_offset += offset_increase;
882
883   /* Fix copy_offset */
884   GNUNET_assert (offset_increase <= socket->copy_offset);
885   socket->copy_offset -= offset_increase;
886   
887   /* Fix relative boundaries */
888   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
889     {
890       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
891         {
892           socket->receive_buffer_boundaries[packet] = 
893             socket->receive_buffer_boundaries[packet + sequence_increase] 
894             - offset_increase;
895         }
896       else
897         socket->receive_buffer_boundaries[packet] = 0;
898     }
899 }
900
901
902 /**
903  * Cancels the existing read io handle
904  *
905  * @param cls the closure from the SCHEDULER call
906  * @param tc the task context
907  */
908 static void
909 read_io_timeout (void *cls, 
910                 const struct GNUNET_SCHEDULER_TaskContext *tc)
911 {
912   struct GNUNET_STREAM_Socket *socket = cls;
913
914   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
915   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
916   {
917     GNUNET_SCHEDULER_cancel (socket->read_task_id);
918     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
919   }
920   GNUNET_assert (NULL != socket->read_handle);
921   
922   GNUNET_free (socket->read_handle);
923   socket->read_handle = NULL;
924 }
925
926
927 /**
928  * Handler for DATA messages; Same for both client and server
929  *
930  * @param socket the socket through which the ack was received
931  * @param tunnel connection to the other end
932  * @param sender who sent the message
933  * @param msg the data message
934  * @param atsi performance data for the connection
935  * @return GNUNET_OK to keep the connection open,
936  *         GNUNET_SYSERR to close it (signal serious error)
937  */
938 static int
939 handle_data (struct GNUNET_STREAM_Socket *socket,
940              struct GNUNET_MESH_Tunnel *tunnel,
941              const struct GNUNET_PeerIdentity *sender,
942              const struct GNUNET_STREAM_DataMessage *msg,
943              const struct GNUNET_ATS_Information*atsi)
944 {
945   const void *payload;
946   uint32_t bytes_needed;
947   uint32_t relative_offset;
948   uint32_t relative_sequence_number;
949   uint16_t size;
950
951   size = htons (msg->header.header.size);
952   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
953     {
954       GNUNET_break_op (0);
955       return GNUNET_SYSERR;
956     }
957
958   if (GNUNET_PEER_search (sender) != socket->other_peer)
959     {
960       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961                   "%x: Received DATA from non-confirming peer\n",
962                   socket->our_id);
963       return GNUNET_YES;
964     }
965
966   switch (socket->state)
967     {
968     case STATE_ESTABLISHED:
969     case STATE_TRANSMIT_CLOSED:
970     case STATE_TRANSMIT_CLOSE_WAIT:
971       
972       /* check if the message's sequence number is in the range we are
973          expecting */
974       relative_sequence_number = 
975         ntohl (msg->sequence_number) - socket->read_sequence_number;
976       if ( relative_sequence_number > 64)
977         {
978           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979                       "%x: Ignoring received message with sequence number %u\n",
980                       socket->our_id,
981                       ntohl (msg->sequence_number));
982           return GNUNET_YES;
983         }
984
985       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986                   "%x: Receiving DATA with sequence number: %u and size: %d from %x\n",
987                   socket->our_id,
988                   ntohl (msg->sequence_number),
989                   ntohs (msg->header.header.size),
990                   socket->other_peer);
991
992       /* Check if we have to allocate the buffer */
993       size -= sizeof (struct GNUNET_STREAM_DataMessage);
994       relative_offset = ntohl (msg->offset) - socket->read_offset;
995       bytes_needed = relative_offset + size;
996       
997       if (bytes_needed > socket->receive_buffer_size)
998         {
999           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1000             {
1001               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1002                                                        bytes_needed);
1003               socket->receive_buffer_size = bytes_needed;
1004             }
1005           else
1006             {
1007               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1008                           "%x: Cannot accommodate packet %d as buffer is",
1009                           "full\n",
1010                           socket->our_id,
1011                           ntohl (msg->sequence_number));
1012               return GNUNET_YES;
1013             }
1014         }
1015       
1016       /* Copy Data to buffer */
1017       payload = &msg[1];
1018       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1019       memcpy (socket->receive_buffer + relative_offset,
1020               payload,
1021               size);
1022       socket->receive_buffer_boundaries[relative_sequence_number] = 
1023         relative_offset + size;
1024       
1025       /* Modify the ACK bitmap */
1026       ackbitmap_modify_bit (&socket->ack_bitmap,
1027                             relative_sequence_number,
1028                             GNUNET_YES);
1029
1030       /* Start ACK sending task if one is not already present */
1031       if (0 == socket->ack_task_id)
1032        {
1033          socket->ack_task_id = 
1034            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1035                                          (msg->ack_deadline),
1036                                          &ack_task,
1037                                          socket);
1038        }
1039
1040       if ((NULL != socket->read_handle) /* A read handle is waiting */
1041           /* There is no current read task */
1042           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1043           /* We have the first packet */
1044           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1045                                                  0)))
1046         {
1047           socket->read_task_id = 
1048             GNUNET_SCHEDULER_add_now (&call_read_processor,
1049                                       socket);
1050         }
1051       
1052       break;
1053
1054     default:
1055       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056                   "%x: Received data message when it cannot be handled\n",
1057                   socket->our_id);
1058       break;
1059     }
1060   return GNUNET_YES;
1061 }
1062
1063 /**
1064  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1065  *
1066  * @param cls the socket (set from GNUNET_MESH_connect)
1067  * @param tunnel connection to the other end
1068  * @param tunnel_ctx place to store local state associated with the tunnel
1069  * @param sender who sent the message
1070  * @param message the actual message
1071  * @param atsi performance data for the connection
1072  * @return GNUNET_OK to keep the connection open,
1073  *         GNUNET_SYSERR to close it (signal serious error)
1074  */
1075 static int
1076 client_handle_data (void *cls,
1077              struct GNUNET_MESH_Tunnel *tunnel,
1078              void **tunnel_ctx,
1079              const struct GNUNET_PeerIdentity *sender,
1080              const struct GNUNET_MessageHeader *message,
1081              const struct GNUNET_ATS_Information*atsi)
1082 {
1083   struct GNUNET_STREAM_Socket *socket = cls;
1084
1085   return handle_data (socket, 
1086                       tunnel, 
1087                       sender, 
1088                       (const struct GNUNET_STREAM_DataMessage *) message, 
1089                       atsi);
1090 }
1091
1092
1093 /**
1094  * Callback to set state to ESTABLISHED
1095  *
1096  * @param cls the closure from queue_message FIXME: document
1097  * @param socket the socket to requiring state change
1098  */
1099 static void
1100 set_state_established (void *cls,
1101                        struct GNUNET_STREAM_Socket *socket)
1102 {
1103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1104               "%x: Attaining ESTABLISHED state\n",
1105               socket->our_id);
1106   socket->write_offset = 0;
1107   socket->read_offset = 0;
1108   socket->state = STATE_ESTABLISHED;
1109   if (socket->open_cb)
1110     socket->open_cb (socket->open_cls, socket);
1111 }
1112
1113
1114 /**
1115  * Callback to set state to HELLO_WAIT
1116  *
1117  * @param cls the closure from queue_message
1118  * @param socket the socket to requiring state change
1119  */
1120 static void
1121 set_state_hello_wait (void *cls,
1122                       struct GNUNET_STREAM_Socket *socket)
1123 {
1124   GNUNET_assert (STATE_INIT == socket->state);
1125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1126               "%x: Attaining HELLO_WAIT state\n",
1127               socket->our_id);
1128   socket->state = STATE_HELLO_WAIT;
1129 }
1130
1131
1132 /**
1133  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1134  *
1135  * @param cls the socket (set from GNUNET_MESH_connect)
1136  * @param tunnel connection to the other end
1137  * @param tunnel_ctx this is NULL
1138  * @param sender who sent the message
1139  * @param message the actual message
1140  * @param atsi performance data for the connection
1141  * @return GNUNET_OK to keep the connection open,
1142  *         GNUNET_SYSERR to close it (signal serious error)
1143  */
1144 static int
1145 client_handle_hello_ack (void *cls,
1146                          struct GNUNET_MESH_Tunnel *tunnel,
1147                          void **tunnel_ctx,
1148                          const struct GNUNET_PeerIdentity *sender,
1149                          const struct GNUNET_MessageHeader *message,
1150                          const struct GNUNET_ATS_Information*atsi)
1151 {
1152   struct GNUNET_STREAM_Socket *socket = cls;
1153   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1154   struct GNUNET_STREAM_HelloAckMessage *reply;
1155
1156   if (GNUNET_PEER_search (sender) != socket->other_peer)
1157     {
1158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159                   "%x: Received HELLO_ACK from non-confirming peer\n",
1160                   socket->our_id);
1161       return GNUNET_YES;
1162     }
1163   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165               "%x: Received HELLO_ACK from %x\n",
1166               socket->our_id,
1167               socket->other_peer);
1168
1169   GNUNET_assert (socket->tunnel == tunnel);
1170   switch (socket->state)
1171   {
1172   case STATE_HELLO_WAIT:
1173     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1174     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175                 "%x: Read sequence number %u\n",
1176                 socket->our_id,
1177                 (unsigned int) socket->read_sequence_number);
1178     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1179     /* Get the random sequence number */
1180     socket->write_sequence_number = 
1181       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1182       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1183                   "%x: Generated write sequence number %u\n",
1184                   socket->our_id,
1185                   (unsigned int) socket->write_sequence_number);
1186     reply = 
1187       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1188     reply->header.header.size = 
1189       htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1190     reply->header.header.type = 
1191       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1192     reply->sequence_number = htonl (socket->write_sequence_number);
1193     reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1194     queue_message (socket, 
1195                    &reply->header, 
1196                    &set_state_established, 
1197                    NULL);      
1198     return GNUNET_OK;
1199   case STATE_ESTABLISHED:
1200   case STATE_RECEIVE_CLOSE_WAIT:
1201     // call statistics (# ACKs ignored++)
1202     return GNUNET_OK;
1203   case STATE_INIT:
1204   default:
1205     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1207                 socket->our_id,
1208                 socket->other_peer,
1209                 socket->state);
1210     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1211     return GNUNET_SYSERR;
1212   }
1213
1214 }
1215
1216
1217 /**
1218  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1219  *
1220  * @param cls the socket (set from GNUNET_MESH_connect)
1221  * @param tunnel connection to the other end
1222  * @param tunnel_ctx this is NULL
1223  * @param sender who sent the message
1224  * @param message the actual message
1225  * @param atsi performance data for the connection
1226  * @return GNUNET_OK to keep the connection open,
1227  *         GNUNET_SYSERR to close it (signal serious error)
1228  */
1229 static int
1230 client_handle_reset (void *cls,
1231                      struct GNUNET_MESH_Tunnel *tunnel,
1232                      void **tunnel_ctx,
1233                      const struct GNUNET_PeerIdentity *sender,
1234                      const struct GNUNET_MessageHeader *message,
1235                      const struct GNUNET_ATS_Information*atsi)
1236 {
1237   struct GNUNET_STREAM_Socket *socket = cls;
1238
1239   return GNUNET_OK;
1240 }
1241
1242
1243 /**
1244  * Common message handler for handling TRANSMIT_CLOSE messages
1245  *
1246  * @param socket the socket through which the ack was received
1247  * @param tunnel connection to the other end
1248  * @param sender who sent the message
1249  * @param msg the transmit close message
1250  * @param atsi performance data for the connection
1251  * @return GNUNET_OK to keep the connection open,
1252  *         GNUNET_SYSERR to close it (signal serious error)
1253  */
1254 static int
1255 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1256                        struct GNUNET_MESH_Tunnel *tunnel,
1257                        const struct GNUNET_PeerIdentity *sender,
1258                        const struct GNUNET_STREAM_MessageHeader *msg,
1259                        const struct GNUNET_ATS_Information*atsi)
1260 {
1261   struct GNUNET_STREAM_MessageHeader *reply;
1262
1263   switch (socket->state)
1264     {
1265     case STATE_ESTABLISHED:
1266       socket->state = STATE_RECEIVE_CLOSED;
1267
1268       /* Send TRANSMIT_CLOSE_ACK */
1269       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1270       reply->header.type = 
1271         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1272       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1273       queue_message (socket, reply, NULL, NULL);
1274       break;
1275
1276     default:
1277       /* FIXME: Call statistics? */
1278       break;
1279     }
1280   return GNUNET_YES;
1281 }
1282
1283
1284 /**
1285  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1286  *
1287  * @param cls the socket (set from GNUNET_MESH_connect)
1288  * @param tunnel connection to the other end
1289  * @param tunnel_ctx this is NULL
1290  * @param sender who sent the message
1291  * @param message the actual message
1292  * @param atsi performance data for the connection
1293  * @return GNUNET_OK to keep the connection open,
1294  *         GNUNET_SYSERR to close it (signal serious error)
1295  */
1296 static int
1297 client_handle_transmit_close (void *cls,
1298                               struct GNUNET_MESH_Tunnel *tunnel,
1299                               void **tunnel_ctx,
1300                               const struct GNUNET_PeerIdentity *sender,
1301                               const struct GNUNET_MessageHeader *message,
1302                               const struct GNUNET_ATS_Information*atsi)
1303 {
1304   struct GNUNET_STREAM_Socket *socket = cls;
1305   
1306   return handle_transmit_close (socket,
1307                                 tunnel,
1308                                 sender,
1309                                 (struct GNUNET_STREAM_MessageHeader *)message,
1310                                 atsi);
1311 }
1312
1313
1314 /**
1315  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1316  *
1317  * @param cls the socket (set from GNUNET_MESH_connect)
1318  * @param tunnel connection to the other end
1319  * @param tunnel_ctx this is NULL
1320  * @param sender who sent the message
1321  * @param message the actual message
1322  * @param atsi performance data for the connection
1323  * @return GNUNET_OK to keep the connection open,
1324  *         GNUNET_SYSERR to close it (signal serious error)
1325  */
1326 static int
1327 client_handle_transmit_close_ack (void *cls,
1328                                   struct GNUNET_MESH_Tunnel *tunnel,
1329                                   void **tunnel_ctx,
1330                                   const struct GNUNET_PeerIdentity *sender,
1331                                   const struct GNUNET_MessageHeader *message,
1332                                   const struct GNUNET_ATS_Information*atsi)
1333 {
1334   struct GNUNET_STREAM_Socket *socket = cls;
1335
1336   return GNUNET_OK;
1337 }
1338
1339
1340 /**
1341  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1342  *
1343  * @param cls the socket (set from GNUNET_MESH_connect)
1344  * @param tunnel connection to the other end
1345  * @param tunnel_ctx this is NULL
1346  * @param sender who sent the message
1347  * @param message the actual message
1348  * @param atsi performance data for the connection
1349  * @return GNUNET_OK to keep the connection open,
1350  *         GNUNET_SYSERR to close it (signal serious error)
1351  */
1352 static int
1353 client_handle_receive_close (void *cls,
1354                              struct GNUNET_MESH_Tunnel *tunnel,
1355                              void **tunnel_ctx,
1356                              const struct GNUNET_PeerIdentity *sender,
1357                              const struct GNUNET_MessageHeader *message,
1358                              const struct GNUNET_ATS_Information*atsi)
1359 {
1360   struct GNUNET_STREAM_Socket *socket = cls;
1361
1362   return GNUNET_OK;
1363 }
1364
1365
1366 /**
1367  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1368  *
1369  * @param cls the socket (set from GNUNET_MESH_connect)
1370  * @param tunnel connection to the other end
1371  * @param tunnel_ctx this is NULL
1372  * @param sender who sent the message
1373  * @param message the actual message
1374  * @param atsi performance data for the connection
1375  * @return GNUNET_OK to keep the connection open,
1376  *         GNUNET_SYSERR to close it (signal serious error)
1377  */
1378 static int
1379 client_handle_receive_close_ack (void *cls,
1380                                  struct GNUNET_MESH_Tunnel *tunnel,
1381                                  void **tunnel_ctx,
1382                                  const struct GNUNET_PeerIdentity *sender,
1383                                  const struct GNUNET_MessageHeader *message,
1384                                  const struct GNUNET_ATS_Information*atsi)
1385 {
1386   struct GNUNET_STREAM_Socket *socket = cls;
1387
1388   return GNUNET_OK;
1389 }
1390
1391
1392 /**
1393  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1394  *
1395  * @param cls the socket (set from GNUNET_MESH_connect)
1396  * @param tunnel connection to the other end
1397  * @param tunnel_ctx this is NULL
1398  * @param sender who sent the message
1399  * @param message the actual message
1400  * @param atsi performance data for the connection
1401  * @return GNUNET_OK to keep the connection open,
1402  *         GNUNET_SYSERR to close it (signal serious error)
1403  */
1404 static int
1405 client_handle_close (void *cls,
1406                      struct GNUNET_MESH_Tunnel *tunnel,
1407                      void **tunnel_ctx,
1408                      const struct GNUNET_PeerIdentity *sender,
1409                      const struct GNUNET_MessageHeader *message,
1410                      const struct GNUNET_ATS_Information*atsi)
1411 {
1412   struct GNUNET_STREAM_Socket *socket = cls;
1413
1414   return GNUNET_OK;
1415 }
1416
1417
1418 /**
1419  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1420  *
1421  * @param cls the socket (set from GNUNET_MESH_connect)
1422  * @param tunnel connection to the other end
1423  * @param tunnel_ctx this is NULL
1424  * @param sender who sent the message
1425  * @param message the actual message
1426  * @param atsi performance data for the connection
1427  * @return GNUNET_OK to keep the connection open,
1428  *         GNUNET_SYSERR to close it (signal serious error)
1429  */
1430 static int
1431 client_handle_close_ack (void *cls,
1432                          struct GNUNET_MESH_Tunnel *tunnel,
1433                          void **tunnel_ctx,
1434                          const struct GNUNET_PeerIdentity *sender,
1435                          const struct GNUNET_MessageHeader *message,
1436                          const struct GNUNET_ATS_Information*atsi)
1437 {
1438   struct GNUNET_STREAM_Socket *socket = cls;
1439
1440   return GNUNET_OK;
1441 }
1442
1443 /*****************************/
1444 /* Server's Message Handlers */
1445 /*****************************/
1446
1447 /**
1448  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1449  *
1450  * @param cls the closure
1451  * @param tunnel connection to the other end
1452  * @param tunnel_ctx the socket
1453  * @param sender who sent the message
1454  * @param message the actual message
1455  * @param atsi performance data for the connection
1456  * @return GNUNET_OK to keep the connection open,
1457  *         GNUNET_SYSERR to close it (signal serious error)
1458  */
1459 static int
1460 server_handle_data (void *cls,
1461                     struct GNUNET_MESH_Tunnel *tunnel,
1462                     void **tunnel_ctx,
1463                     const struct GNUNET_PeerIdentity *sender,
1464                     const struct GNUNET_MessageHeader *message,
1465                     const struct GNUNET_ATS_Information*atsi)
1466 {
1467   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1468
1469   return handle_data (socket,
1470                       tunnel,
1471                       sender,
1472                       (const struct GNUNET_STREAM_DataMessage *)message,
1473                       atsi);
1474 }
1475
1476
1477 /**
1478  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1479  *
1480  * @param cls the closure
1481  * @param tunnel connection to the other end
1482  * @param tunnel_ctx the socket
1483  * @param sender who sent the message
1484  * @param message the actual message
1485  * @param atsi performance data for the connection
1486  * @return GNUNET_OK to keep the connection open,
1487  *         GNUNET_SYSERR to close it (signal serious error)
1488  */
1489 static int
1490 server_handle_hello (void *cls,
1491                      struct GNUNET_MESH_Tunnel *tunnel,
1492                      void **tunnel_ctx,
1493                      const struct GNUNET_PeerIdentity *sender,
1494                      const struct GNUNET_MessageHeader *message,
1495                      const struct GNUNET_ATS_Information*atsi)
1496 {
1497   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1498   struct GNUNET_STREAM_HelloAckMessage *reply;
1499
1500   if (GNUNET_PEER_search (sender) != socket->other_peer)
1501     {
1502       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1503                   "%x: Received HELLO from non-confirming peer\n",
1504                   socket->our_id);
1505       return GNUNET_YES;
1506     }
1507
1508   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1509                  ntohs (message->type));
1510   GNUNET_assert (socket->tunnel == tunnel);
1511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512               "%x: Received HELLO from %x\n", 
1513               socket->our_id,
1514               socket->other_peer);
1515
1516   if (STATE_INIT == socket->state)
1517     {
1518       /* Get the random sequence number */
1519       socket->write_sequence_number = 
1520         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1521       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1522                   "%x: Generated write sequence number %u\n",
1523                   socket->our_id,
1524                   (unsigned int) socket->write_sequence_number);
1525       reply = 
1526         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1527       reply->header.header.size = 
1528         htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1529       reply->header.header.type = 
1530         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1531       reply->sequence_number = htonl (socket->write_sequence_number);
1532       reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1533       queue_message (socket, 
1534                      &reply->header,
1535                      &set_state_hello_wait, 
1536                      NULL);
1537     }
1538   else
1539     {
1540       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1541                   "Client sent HELLO when in state %d\n", socket->state);
1542       /* FIXME: Send RESET? */
1543       
1544     }
1545   return GNUNET_OK;
1546 }
1547
1548
1549 /**
1550  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1551  *
1552  * @param cls the closure
1553  * @param tunnel connection to the other end
1554  * @param tunnel_ctx the socket
1555  * @param sender who sent the message
1556  * @param message the actual message
1557  * @param atsi performance data for the connection
1558  * @return GNUNET_OK to keep the connection open,
1559  *         GNUNET_SYSERR to close it (signal serious error)
1560  */
1561 static int
1562 server_handle_hello_ack (void *cls,
1563                          struct GNUNET_MESH_Tunnel *tunnel,
1564                          void **tunnel_ctx,
1565                          const struct GNUNET_PeerIdentity *sender,
1566                          const struct GNUNET_MessageHeader *message,
1567                          const struct GNUNET_ATS_Information*atsi)
1568 {
1569   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1570   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1571
1572   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1573                  ntohs (message->type));
1574   GNUNET_assert (socket->tunnel == tunnel);
1575   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1576   if (STATE_HELLO_WAIT == socket->state)
1577     {
1578       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1579                   "%x: Received HELLO_ACK from %x\n",
1580                   socket->our_id,
1581                   socket->other_peer);
1582       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1583       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1584                   "%x: Read sequence number %u\n",
1585                   socket->our_id,
1586                   (unsigned int) socket->read_sequence_number);
1587       socket->receiver_window_available = 
1588         ntohl (ack_message->receiver_window_size);
1589       /* Attain ESTABLISHED state */
1590       set_state_established (NULL, socket);
1591     }
1592   else
1593     {
1594       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1595                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1596       /* FIXME: Send RESET? */
1597       
1598     }
1599   return GNUNET_OK;
1600 }
1601
1602
1603 /**
1604  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1605  *
1606  * @param cls the closure
1607  * @param tunnel connection to the other end
1608  * @param tunnel_ctx the socket
1609  * @param sender who sent the message
1610  * @param message the actual message
1611  * @param atsi performance data for the connection
1612  * @return GNUNET_OK to keep the connection open,
1613  *         GNUNET_SYSERR to close it (signal serious error)
1614  */
1615 static int
1616 server_handle_reset (void *cls,
1617                      struct GNUNET_MESH_Tunnel *tunnel,
1618                      void **tunnel_ctx,
1619                      const struct GNUNET_PeerIdentity *sender,
1620                      const struct GNUNET_MessageHeader *message,
1621                      const struct GNUNET_ATS_Information*atsi)
1622 {
1623   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1624
1625   return GNUNET_OK;
1626 }
1627
1628
1629 /**
1630  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1631  *
1632  * @param cls the closure
1633  * @param tunnel connection to the other end
1634  * @param tunnel_ctx the socket
1635  * @param sender who sent the message
1636  * @param message the actual message
1637  * @param atsi performance data for the connection
1638  * @return GNUNET_OK to keep the connection open,
1639  *         GNUNET_SYSERR to close it (signal serious error)
1640  */
1641 static int
1642 server_handle_transmit_close (void *cls,
1643                               struct GNUNET_MESH_Tunnel *tunnel,
1644                               void **tunnel_ctx,
1645                               const struct GNUNET_PeerIdentity *sender,
1646                               const struct GNUNET_MessageHeader *message,
1647                               const struct GNUNET_ATS_Information*atsi)
1648 {
1649   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1650
1651   return handle_transmit_close (socket,
1652                                 tunnel,
1653                                 sender,
1654                                 (struct GNUNET_STREAM_MessageHeader *)message,
1655                                 atsi);
1656 }
1657
1658
1659 /**
1660  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1661  *
1662  * @param cls the closure
1663  * @param tunnel connection to the other end
1664  * @param tunnel_ctx the socket
1665  * @param sender who sent the message
1666  * @param message the actual message
1667  * @param atsi performance data for the connection
1668  * @return GNUNET_OK to keep the connection open,
1669  *         GNUNET_SYSERR to close it (signal serious error)
1670  */
1671 static int
1672 server_handle_transmit_close_ack (void *cls,
1673                                   struct GNUNET_MESH_Tunnel *tunnel,
1674                                   void **tunnel_ctx,
1675                                   const struct GNUNET_PeerIdentity *sender,
1676                                   const struct GNUNET_MessageHeader *message,
1677                                   const struct GNUNET_ATS_Information*atsi)
1678 {
1679   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1680
1681   return GNUNET_OK;
1682 }
1683
1684
1685 /**
1686  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1687  *
1688  * @param cls the closure
1689  * @param tunnel connection to the other end
1690  * @param tunnel_ctx the socket
1691  * @param sender who sent the message
1692  * @param message the actual message
1693  * @param atsi performance data for the connection
1694  * @return GNUNET_OK to keep the connection open,
1695  *         GNUNET_SYSERR to close it (signal serious error)
1696  */
1697 static int
1698 server_handle_receive_close (void *cls,
1699                              struct GNUNET_MESH_Tunnel *tunnel,
1700                              void **tunnel_ctx,
1701                              const struct GNUNET_PeerIdentity *sender,
1702                              const struct GNUNET_MessageHeader *message,
1703                              const struct GNUNET_ATS_Information*atsi)
1704 {
1705   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1706
1707   return GNUNET_OK;
1708 }
1709
1710
1711 /**
1712  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1713  *
1714  * @param cls the closure
1715  * @param tunnel connection to the other end
1716  * @param tunnel_ctx the socket
1717  * @param sender who sent the message
1718  * @param message the actual message
1719  * @param atsi performance data for the connection
1720  * @return GNUNET_OK to keep the connection open,
1721  *         GNUNET_SYSERR to close it (signal serious error)
1722  */
1723 static int
1724 server_handle_receive_close_ack (void *cls,
1725                                  struct GNUNET_MESH_Tunnel *tunnel,
1726                                  void **tunnel_ctx,
1727                                  const struct GNUNET_PeerIdentity *sender,
1728                                  const struct GNUNET_MessageHeader *message,
1729                                  const struct GNUNET_ATS_Information*atsi)
1730 {
1731   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1732
1733   return GNUNET_OK;
1734 }
1735
1736
1737 /**
1738  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1739  *
1740  * @param cls the closure
1741  * @param tunnel connection to the other end
1742  * @param tunnel_ctx the socket
1743  * @param sender who sent the message
1744  * @param message the actual message
1745  * @param atsi performance data for the connection
1746  * @return GNUNET_OK to keep the connection open,
1747  *         GNUNET_SYSERR to close it (signal serious error)
1748  */
1749 static int
1750 server_handle_close (void *cls,
1751                      struct GNUNET_MESH_Tunnel *tunnel,
1752                      void **tunnel_ctx,
1753                      const struct GNUNET_PeerIdentity *sender,
1754                      const struct GNUNET_MessageHeader *message,
1755                      const struct GNUNET_ATS_Information*atsi)
1756 {
1757   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1758
1759   return GNUNET_OK;
1760 }
1761
1762
1763 /**
1764  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1765  *
1766  * @param cls the closure
1767  * @param tunnel connection to the other end
1768  * @param tunnel_ctx the socket
1769  * @param sender who sent the message
1770  * @param message the actual message
1771  * @param atsi performance data for the connection
1772  * @return GNUNET_OK to keep the connection open,
1773  *         GNUNET_SYSERR to close it (signal serious error)
1774  */
1775 static int
1776 server_handle_close_ack (void *cls,
1777                          struct GNUNET_MESH_Tunnel *tunnel,
1778                          void **tunnel_ctx,
1779                          const struct GNUNET_PeerIdentity *sender,
1780                          const struct GNUNET_MessageHeader *message,
1781                          const struct GNUNET_ATS_Information*atsi)
1782 {
1783   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1784
1785   return GNUNET_OK;
1786 }
1787
1788
1789 /**
1790  * Message Handler for mesh
1791  *
1792  * @param socket the socket through which the ack was received
1793  * @param tunnel connection to the other end
1794  * @param sender who sent the message
1795  * @param ack the acknowledgment message
1796  * @param atsi performance data for the connection
1797  * @return GNUNET_OK to keep the connection open,
1798  *         GNUNET_SYSERR to close it (signal serious error)
1799  */
1800 static int
1801 handle_ack (struct GNUNET_STREAM_Socket *socket,
1802             struct GNUNET_MESH_Tunnel *tunnel,
1803             const struct GNUNET_PeerIdentity *sender,
1804             const struct GNUNET_STREAM_AckMessage *ack,
1805             const struct GNUNET_ATS_Information*atsi)
1806 {
1807   unsigned int packet;
1808   int need_retransmission;
1809
1810   if (GNUNET_PEER_search (sender) != socket->other_peer)
1811     {
1812       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1813                   "%x: Received ACK from non-confirming peer\n",
1814                   socket->our_id);
1815       return GNUNET_YES;
1816     }
1817
1818   switch (socket->state)
1819     {
1820     case (STATE_ESTABLISHED):
1821       if (NULL == socket->write_handle)
1822         {
1823           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1824                       "%x: Received DATA_ACK when write_handle is NULL\n",
1825                       socket->our_id);
1826           return GNUNET_OK;
1827         }
1828       
1829       if (!((socket->write_sequence_number 
1830              - htonl (ack->base_sequence_number)) < 64))
1831         {
1832           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1833                       "%x: Received DATA_ACK with unexpected base sequence",
1834                       "number\n",
1835                       socket->our_id);
1836           return GNUNET_OK;
1837         }
1838       /* FIXME: include the case when write_handle is cancelled - ignore the 
1839          acks */
1840       
1841       /* Cancel the retransmission task */
1842       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1843         {
1844           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1845           socket->retransmission_timeout_task_id = 
1846             GNUNET_SCHEDULER_NO_TASK;
1847         }
1848          
1849       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1850       socket->receiver_window_available = 
1851         ntohl (ack->receive_window_remaining);
1852
1853       /* Check if we have received all acknowledgements */
1854       need_retransmission = GNUNET_NO;
1855       for (packet=0; packet < 64; packet++)
1856         {
1857           if (NULL == socket->write_handle->messages[packet]) break;
1858           if (GNUNET_YES != ackbitmap_is_bit_set 
1859               (&socket->write_handle->ack_bitmap,packet))
1860             {
1861               need_retransmission = GNUNET_YES;
1862               break;
1863             }
1864         }
1865       if (GNUNET_YES == need_retransmission)
1866         {
1867           write_data (socket);
1868         }
1869       else      /* We have to call the write continuation callback now */
1870         {
1871
1872           /* Free the packets */
1873           for (packet=0; packet < 64; packet++)
1874             {
1875               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1876             }
1877           if (NULL != socket->write_handle->write_cont)
1878             socket->write_handle->write_cont
1879               (socket->write_handle->write_cont_cls,
1880                socket->status,
1881                socket->write_handle->size);
1882           /* We are done with the write handle - Freeing it */
1883           GNUNET_free (socket->write_handle);
1884           socket->write_handle = NULL;
1885         }
1886       break;
1887     default:
1888       break;
1889     }
1890   return GNUNET_OK;
1891 }
1892
1893
1894 /**
1895  * Message Handler for mesh
1896  *
1897  * @param cls the 'struct GNUNET_STREAM_Socket'
1898  * @param tunnel connection to the other end
1899  * @param tunnel_ctx unused
1900  * @param sender who sent the message
1901  * @param message the actual message
1902  * @param atsi performance data for the connection
1903  * @return GNUNET_OK to keep the connection open,
1904  *         GNUNET_SYSERR to close it (signal serious error)
1905  */
1906 static int
1907 client_handle_ack (void *cls,
1908                    struct GNUNET_MESH_Tunnel *tunnel,
1909                    void **tunnel_ctx,
1910                    const struct GNUNET_PeerIdentity *sender,
1911                    const struct GNUNET_MessageHeader *message,
1912                    const struct GNUNET_ATS_Information*atsi)
1913 {
1914   struct GNUNET_STREAM_Socket *socket = cls;
1915   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1916  
1917   return handle_ack (socket, tunnel, sender, ack, atsi);
1918 }
1919
1920
1921 /**
1922  * Message Handler for mesh
1923  *
1924  * @param cls the server's listen socket
1925  * @param tunnel connection to the other end
1926  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1927  * @param sender who sent the message
1928  * @param message the actual message
1929  * @param atsi performance data for the connection
1930  * @return GNUNET_OK to keep the connection open,
1931  *         GNUNET_SYSERR to close it (signal serious error)
1932  */
1933 static int
1934 server_handle_ack (void *cls,
1935                    struct GNUNET_MESH_Tunnel *tunnel,
1936                    void **tunnel_ctx,
1937                    const struct GNUNET_PeerIdentity *sender,
1938                    const struct GNUNET_MessageHeader *message,
1939                    const struct GNUNET_ATS_Information*atsi)
1940 {
1941   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1942   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1943  
1944   return handle_ack (socket, tunnel, sender, ack, atsi);
1945 }
1946
1947
1948 /**
1949  * For client message handlers, the stream socket is in the
1950  * closure argument.
1951  */
1952 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1953   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1954   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1955    sizeof (struct GNUNET_STREAM_AckMessage) },
1956   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1957    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1958   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1959    sizeof (struct GNUNET_STREAM_MessageHeader)},
1960   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1961    sizeof (struct GNUNET_STREAM_MessageHeader)},
1962   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1963    sizeof (struct GNUNET_STREAM_MessageHeader)},
1964   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1965    sizeof (struct GNUNET_STREAM_MessageHeader)},
1966   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1967    sizeof (struct GNUNET_STREAM_MessageHeader)},
1968   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1969    sizeof (struct GNUNET_STREAM_MessageHeader)},
1970   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1971    sizeof (struct GNUNET_STREAM_MessageHeader)},
1972   {NULL, 0, 0}
1973 };
1974
1975
1976 /**
1977  * For server message handlers, the stream socket is in the
1978  * tunnel context, and the listen socket in the closure argument.
1979  */
1980 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1981   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1982   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1983    sizeof (struct GNUNET_STREAM_AckMessage) },
1984   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1985    sizeof (struct GNUNET_STREAM_MessageHeader)},
1986   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1987    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1988   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1989    sizeof (struct GNUNET_STREAM_MessageHeader)},
1990   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1991    sizeof (struct GNUNET_STREAM_MessageHeader)},
1992   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1993    sizeof (struct GNUNET_STREAM_MessageHeader)},
1994   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1995    sizeof (struct GNUNET_STREAM_MessageHeader)},
1996   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1997    sizeof (struct GNUNET_STREAM_MessageHeader)},
1998   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1999    sizeof (struct GNUNET_STREAM_MessageHeader)},
2000   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2001    sizeof (struct GNUNET_STREAM_MessageHeader)},
2002   {NULL, 0, 0}
2003 };
2004
2005
2006 /**
2007  * Function called when our target peer is connected to our tunnel
2008  *
2009  * @param cls the socket for which this tunnel is created
2010  * @param peer the peer identity of the target
2011  * @param atsi performance data for the connection
2012  */
2013 static void
2014 mesh_peer_connect_callback (void *cls,
2015                             const struct GNUNET_PeerIdentity *peer,
2016                             const struct GNUNET_ATS_Information * atsi)
2017 {
2018   struct GNUNET_STREAM_Socket *socket = cls;
2019   struct GNUNET_STREAM_MessageHeader *message;
2020   GNUNET_PEER_Id connected_peer;
2021
2022   connected_peer = GNUNET_PEER_search (peer);
2023   
2024   if (connected_peer != socket->other_peer)
2025     {
2026       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2027                   "%x: A peer which is not our target has connected",
2028                   "to our tunnel\n",
2029                   socket->our_id);
2030       return;
2031     }
2032   
2033   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2034               "%x: Target peer %x connected\n", 
2035               socket->our_id,
2036               connected_peer);
2037   
2038   /* Set state to INIT */
2039   socket->state = STATE_INIT;
2040
2041   /* Send HELLO message */
2042   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2043   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2044   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2045   queue_message (socket,
2046                  message,
2047                  &set_state_hello_wait,
2048                  NULL);
2049
2050   /* Call open callback */
2051   if (NULL == socket->open_cb)
2052     {
2053       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2054                   "STREAM_open callback is NULL\n");
2055     }
2056 }
2057
2058
2059 /**
2060  * Function called when our target peer is disconnected from our tunnel
2061  *
2062  * @param cls the socket associated which this tunnel
2063  * @param peer the peer identity of the target
2064  */
2065 static void
2066 mesh_peer_disconnect_callback (void *cls,
2067                                const struct GNUNET_PeerIdentity *peer)
2068 {
2069
2070 }
2071
2072
2073 /**
2074  * Method called whenever a peer creates a tunnel to us
2075  *
2076  * @param cls closure
2077  * @param tunnel new handle to the tunnel
2078  * @param initiator peer that started the tunnel
2079  * @param atsi performance information for the tunnel
2080  * @return initial tunnel context for the tunnel
2081  *         (can be NULL -- that's not an error)
2082  */
2083 static void *
2084 new_tunnel_notify (void *cls,
2085                    struct GNUNET_MESH_Tunnel *tunnel,
2086                    const struct GNUNET_PeerIdentity *initiator,
2087                    const struct GNUNET_ATS_Information *atsi)
2088 {
2089   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2090   struct GNUNET_STREAM_Socket *socket;
2091
2092   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2093      from the same peer again until the socket is closed */
2094
2095   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2096   socket->other_peer = GNUNET_PEER_intern (initiator);
2097   socket->tunnel = tunnel;
2098   socket->session_id = 0;       /* FIXME */
2099   socket->state = STATE_INIT;
2100   socket->derived = GNUNET_YES;
2101   socket->our_id = lsocket->our_id;
2102   
2103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2104               "%x: Peer %x initiated tunnel to us\n", 
2105               socket->our_id,
2106               socket->other_peer);
2107   
2108   /* FIXME: Copy MESH handle from lsocket to socket */
2109   /* FIXME: What if listen_cb is NULL */
2110   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
2111                                            socket,
2112                                            initiator))
2113     {
2114       socket->state = STATE_CLOSED;
2115       /* FIXME: Send CLOSE message and then free */
2116       GNUNET_free (socket);
2117       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
2118     }
2119   return socket;
2120 }
2121
2122
2123 /**
2124  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2125  * any associated state.  This function is NOT called if the client has
2126  * explicitly asked for the tunnel to be destroyed using
2127  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2128  * the tunnel.
2129  *
2130  * @param cls closure (set from GNUNET_MESH_connect)
2131  * @param tunnel connection to the other end (henceforth invalid)
2132  * @param tunnel_ctx place where local state associated
2133  *                   with the tunnel is stored
2134  */
2135 static void 
2136 tunnel_cleaner (void *cls,
2137                 const struct GNUNET_MESH_Tunnel *tunnel,
2138                 void *tunnel_ctx)
2139 {
2140   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2141   
2142   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2143               "%x: Peer %x has terminated connection abruptly\n",
2144               socket->our_id,
2145               socket->other_peer);
2146
2147   socket->status = GNUNET_STREAM_SHUTDOWN;
2148
2149   /* Clear Transmit handles */
2150   if (NULL != socket->transmit_handle)
2151     {
2152       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2153       socket->transmit_handle = NULL;
2154     }
2155   socket->tunnel = NULL;
2156 }
2157
2158
2159 /*****************/
2160 /* API functions */
2161 /*****************/
2162
2163
2164 /**
2165  * Tries to open a stream to the target peer
2166  *
2167  * @param cfg configuration to use
2168  * @param target the target peer to which the stream has to be opened
2169  * @param app_port the application port number which uniquely identifies this
2170  *            stream
2171  * @param open_cb this function will be called after stream has be established 
2172  * @param open_cb_cls the closure for open_cb
2173  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2174  * @return if successful it returns the stream socket; NULL if stream cannot be
2175  *         opened 
2176  */
2177 struct GNUNET_STREAM_Socket *
2178 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2179                     const struct GNUNET_PeerIdentity *target,
2180                     GNUNET_MESH_ApplicationType app_port,
2181                     GNUNET_STREAM_OpenCallback open_cb,
2182                     void *open_cb_cls,
2183                     ...)
2184 {
2185   struct GNUNET_STREAM_Socket *socket;
2186   struct GNUNET_PeerIdentity own_peer_id;
2187   enum GNUNET_STREAM_Option option;
2188   va_list vargs;                /* Variable arguments */
2189
2190   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2191               "%s\n", __func__);
2192
2193   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2194   socket->other_peer = GNUNET_PEER_intern (target);
2195   socket->open_cb = open_cb;
2196   socket->open_cls = open_cb_cls;
2197   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2198   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2199   
2200   /* Set defaults */
2201   socket->retransmit_timeout = 
2202     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2203
2204   va_start (vargs, open_cb_cls); /* Parse variable args */
2205   do {
2206     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2207     switch (option)
2208       {
2209       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2210         /* Expect struct GNUNET_TIME_Relative */
2211         socket->retransmit_timeout = va_arg (vargs,
2212                                              struct GNUNET_TIME_Relative);
2213         break;
2214       case GNUNET_STREAM_OPTION_END:
2215         break;
2216       }
2217   } while (GNUNET_STREAM_OPTION_END != option);
2218   va_end (vargs);               /* End of variable args parsing */
2219   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2220                                       10,  /* QUEUE size as parameter? */
2221                                       socket, /* cls */
2222                                       NULL, /* No inbound tunnel handler */
2223                                       &tunnel_cleaner, /* FIXME: not required? */
2224                                       client_message_handlers,
2225                                       &app_port); /* We don't get inbound tunnels */
2226   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2227     {
2228       GNUNET_free (socket);
2229       return NULL;
2230     }
2231
2232   /* Now create the mesh tunnel to target */
2233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2234               "Creating MESH Tunnel\n");
2235   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2236                                               NULL, /* Tunnel context */
2237                                               &mesh_peer_connect_callback,
2238                                               &mesh_peer_disconnect_callback,
2239                                               socket);
2240   GNUNET_assert (NULL != socket->tunnel);
2241   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2242                                         target);
2243   
2244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2245               "%s() END\n", __func__);
2246   return socket;
2247 }
2248
2249
2250 /**
2251  * Shutdown the stream for reading or writing (man 2 shutdown).
2252  *
2253  * @param socket the stream socket
2254  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2255  */
2256 void
2257 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2258                         int how)
2259 {
2260   return;
2261 }
2262
2263
2264 /**
2265  * Closes the stream
2266  *
2267  * @param socket the stream socket
2268  */
2269 void
2270 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2271 {
2272   struct MessageQueue *head;
2273
2274   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2275   {
2276     /* socket closed with read task pending!? */
2277     GNUNET_break (0);
2278     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2279     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2280   }
2281
2282   /* Clear Transmit handles */
2283   if (NULL != socket->transmit_handle)
2284     {
2285       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2286       socket->transmit_handle = NULL;
2287     }
2288
2289   /* Clear existing message queue */
2290   while (NULL != (head = socket->queue_head)) {
2291     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2292                                  socket->queue_tail,
2293                                  head);
2294     GNUNET_free (head->message);
2295     GNUNET_free (head);
2296   }
2297
2298   /* Close associated tunnel */
2299   if (NULL != socket->tunnel)
2300     {
2301       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2302       socket->tunnel = NULL;
2303     }
2304
2305   /* Close mesh connection */
2306   if (NULL != socket->mesh && GNUNET_YES != socket->derived)
2307     {
2308       GNUNET_MESH_disconnect (socket->mesh);
2309       socket->mesh = NULL;
2310     }
2311   
2312   /* Release receive buffer */
2313   if (NULL != socket->receive_buffer)
2314     {
2315       GNUNET_free (socket->receive_buffer);
2316     }
2317
2318   GNUNET_free (socket);
2319 }
2320
2321
2322 /**
2323  * Listens for stream connections for a specific application ports
2324  *
2325  * @param cfg the configuration to use
2326  * @param app_port the application port for which new streams will be accepted
2327  * @param listen_cb this function will be called when a peer tries to establish
2328  *            a stream with us
2329  * @param listen_cb_cls closure for listen_cb
2330  * @return listen socket, NULL for any error
2331  */
2332 struct GNUNET_STREAM_ListenSocket *
2333 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2334                       GNUNET_MESH_ApplicationType app_port,
2335                       GNUNET_STREAM_ListenCallback listen_cb,
2336                       void *listen_cb_cls)
2337 {
2338   /* FIXME: Add variable args for passing configration options? */
2339   struct GNUNET_STREAM_ListenSocket *lsocket;
2340   struct GNUNET_PeerIdentity our_peer_id;
2341
2342   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2343   lsocket->port = app_port;
2344   lsocket->listen_cb = listen_cb;
2345   lsocket->listen_cb_cls = listen_cb_cls;
2346   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2347   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2348   lsocket->mesh = GNUNET_MESH_connect (cfg,
2349                                        10, /* FIXME: QUEUE size as parameter? */
2350                                        lsocket, /* Closure */
2351                                        &new_tunnel_notify,
2352                                        &tunnel_cleaner,
2353                                        server_message_handlers,
2354                                        &app_port);
2355   GNUNET_assert (NULL != lsocket->mesh);
2356   return lsocket;
2357 }
2358
2359
2360 /**
2361  * Closes the listen socket
2362  *
2363  * @param lsocket the listen socket
2364  */
2365 void
2366 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2367 {
2368   /* Close MESH connection */
2369   GNUNET_assert (NULL != lsocket->mesh);
2370   GNUNET_MESH_disconnect (lsocket->mesh);
2371   
2372   GNUNET_free (lsocket);
2373 }
2374
2375
2376 /**
2377  * Tries to write the given data to the stream
2378  *
2379  * @param socket the socket representing a stream
2380  * @param data the data buffer from where the data is written into the stream
2381  * @param size the number of bytes to be written from the data buffer
2382  * @param timeout the timeout period
2383  * @param write_cont the function to call upon writing some bytes into the stream
2384  * @param write_cont_cls the closure
2385  * @return handle to cancel the operation
2386  */
2387 struct GNUNET_STREAM_IOWriteHandle *
2388 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2389                      const void *data,
2390                      size_t size,
2391                      struct GNUNET_TIME_Relative timeout,
2392                      GNUNET_STREAM_CompletionContinuation write_cont,
2393                      void *write_cont_cls)
2394 {
2395   unsigned int num_needed_packets;
2396   unsigned int packet;
2397   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2398   uint32_t packet_size;
2399   uint32_t payload_size;
2400   struct GNUNET_STREAM_DataMessage *data_msg;
2401   const void *sweep;
2402   struct GNUNET_TIME_Relative ack_deadline;
2403
2404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2405               "%s\n", __func__);
2406
2407   /* Return NULL if there is already a write request pending */
2408   if (NULL != socket->write_handle)
2409   {
2410     GNUNET_break (0);
2411     return NULL;
2412   }
2413   if (!((STATE_ESTABLISHED == socket->state)
2414         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2415         || (STATE_RECEIVE_CLOSED == socket->state)))
2416     {
2417       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2418                   "%x: Attempting to write on a closed (OR) not-yet-established"
2419                   "stream\n",
2420                   socket->our_id);
2421       return NULL;
2422     } 
2423   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2424     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2425   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2426   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2427   io_handle->write_cont = write_cont;
2428   io_handle->write_cont_cls = write_cont_cls;
2429   io_handle->size = size;
2430   sweep = data;
2431   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2432      determined from RTT */
2433   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2434   /* Divide the given buffer into packets for sending */
2435   for (packet=0; packet < num_needed_packets; packet++)
2436     {
2437       if ((packet + 1) * max_payload_size < size) 
2438         {
2439           payload_size = max_payload_size;
2440           packet_size = MAX_PACKET_SIZE;
2441         }
2442       else 
2443         {
2444           payload_size = size - packet * max_payload_size;
2445           packet_size =  payload_size + sizeof (struct
2446                                                 GNUNET_STREAM_DataMessage); 
2447         }
2448       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2449       io_handle->messages[packet]->header.header.size = htons (packet_size);
2450       io_handle->messages[packet]->header.header.type =
2451         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2452       io_handle->messages[packet]->sequence_number =
2453         htonl (socket->write_sequence_number++);
2454       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2455
2456       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2457          determined from RTT */
2458       io_handle->messages[packet]->ack_deadline =
2459         GNUNET_TIME_relative_hton (ack_deadline);
2460       data_msg = io_handle->messages[packet];
2461       /* Copy data from given buffer to the packet */
2462       memcpy (&data_msg[1],
2463               sweep,
2464               payload_size);
2465       sweep += payload_size;
2466       socket->write_offset += payload_size;
2467     }
2468   socket->write_handle = io_handle;
2469   write_data (socket);
2470
2471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2472               "%s() END\n", __func__);
2473
2474   return io_handle;
2475 }
2476
2477
2478 /**
2479  * Tries to read data from the stream
2480  *
2481  * @param socket the socket representing a stream
2482  * @param timeout the timeout period
2483  * @param proc function to call with data (once only)
2484  * @param proc_cls the closure for proc
2485  * @return handle to cancel the operation
2486  */
2487 struct GNUNET_STREAM_IOReadHandle *
2488 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2489                     struct GNUNET_TIME_Relative timeout,
2490                     GNUNET_STREAM_DataProcessor proc,
2491                     void *proc_cls)
2492 {
2493   struct GNUNET_STREAM_IOReadHandle *read_handle;
2494   
2495   /* Return NULL if there is already a read handle; the user has to cancel that
2496   first before continuing or has to wait until it is completed */
2497   if (NULL != socket->read_handle) return NULL;
2498
2499   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2500   read_handle->proc = proc;
2501   socket->read_handle = read_handle;
2502
2503   /* Check if we have a packet at bitmap 0 */
2504   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2505                                           0))
2506     {
2507       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2508                                                        socket);
2509    
2510     }
2511   
2512   /* Setup the read timeout task */
2513   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2514                                                                &read_io_timeout,
2515                                                                socket);
2516   return read_handle;
2517 }
2518
2519
2520 /**
2521  * Cancel pending write operation.
2522  *
2523  * @param ioh handle to operation to cancel
2524  */
2525 void
2526 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2527 {
2528   return;
2529 }
2530
2531
2532 /**
2533  * Cancel pending read operation.
2534  *
2535  * @param ioh handle to operation to cancel
2536  */
2537 void
2538 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2539 {
2540   return;
2541 }