- new program_run and run_2
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  *
23  * Checks for matching the sender and socket->other_peer in server
24  * message handlers  
25  *
26  * Decrement PEER intern count during socket close and listen close to free the
27  * memory used for PEER interning
28  **/
29
30 /**
31  * @file stream/stream_api.c
32  * @brief Implementation of the stream library
33  * @author Sree Harsha Totakura
34  */
35 #include "platform.h"
36 #include "gnunet_common.h"
37 #include "gnunet_crypto_lib.h"
38 #include "gnunet_stream_lib.h"
39 #include "gnunet_testing_lib.h"
40 #include "stream_protocol.h"
41
42
43 /**
44  * The maximum packet size of a stream packet
45  */
46 #define MAX_PACKET_SIZE 64000
47
48 /**
49  * The maximum payload a data message packet can carry
50  */
51 static size_t max_payload_size = 
52   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53
54 /**
55  * Receive buffer
56  */
57 #define RECEIVE_BUFFER_SIZE 4096000
58
59 /**
60  * states in the Protocol
61  */
62 enum State
63   {
64     /**
65      * Client initialization state
66      */
67     STATE_INIT,
68
69     /**
70      * Listener initialization state 
71      */
72     STATE_LISTEN,
73
74     /**
75      * Pre-connection establishment state
76      */
77     STATE_HELLO_WAIT,
78
79     /**
80      * State where a connection has been established
81      */
82     STATE_ESTABLISHED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_RECEIVE_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for reading
91      */
92     STATE_RECEIVE_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_TRANSMIT_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed for writing
101      */
102     STATE_TRANSMIT_CLOSED,
103
104     /**
105      * State where the socket is closed on our side and waiting to be ACK'ed
106      */
107     STATE_CLOSE_WAIT,
108
109     /**
110      * State where the socket is closed
111      */
112     STATE_CLOSED 
113   };
114
115
116 /**
117  * Functions of this type are called when a message is written
118  *
119  * @param cls the closure from queue_message
120  * @param socket the socket the written message was bound to
121  */
122 typedef void (*SendFinishCallback) (void *cls,
123                                     struct GNUNET_STREAM_Socket *socket);
124
125
126 /**
127  * The send message queue
128  */
129 struct MessageQueue
130 {
131   /**
132    * The message
133    */
134   struct GNUNET_STREAM_MessageHeader *message;
135
136   /**
137    * Callback to be called when the message is sent
138    */
139   SendFinishCallback finish_cb;
140
141   /**
142    * The closure for finish_cb
143    */
144   void *finish_cb_cls;
145
146   /**
147    * The next message in queue. Should be NULL in the last message
148    */
149   struct MessageQueue *next;
150
151   /**
152    * The next message in queue. Should be NULL in the first message
153    */
154   struct MessageQueue *prev;
155 };
156
157
158 /**
159  * The STREAM Socket Handler
160  */
161 struct GNUNET_STREAM_Socket
162 {
163   /**
164    * Retransmission timeout
165    */
166   struct GNUNET_TIME_Relative retransmit_timeout;
167
168   /**
169    * The Acknowledgement Bitmap
170    */
171   GNUNET_STREAM_AckBitmap ack_bitmap;
172
173   /**
174    * Time when the Acknowledgement was queued
175    */
176   struct GNUNET_TIME_Absolute ack_time_registered;
177
178   /**
179    * Queued Acknowledgement deadline
180    */
181   struct GNUNET_TIME_Relative ack_time_deadline;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOWriteHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOReadHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * Task identifier for the read io timeout task
235    */
236   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
237
238   /**
239    * Task identifier for retransmission task after timeout
240    */
241   GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
242
243   /**
244    * The task for sending timely Acks
245    */
246   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
247
248   /**
249    * Task scheduled to continue a read operation.
250    */
251   GNUNET_SCHEDULER_TaskIdentifier read_task_id;
252
253   /**
254    * The state of the protocol associated with this socket
255    */
256   enum State state;
257
258   /**
259    * The status of the socket
260    */
261   enum GNUNET_STREAM_Status status;
262
263   /**
264    * The number of previous timeouts; FIXME: currently not used
265    */
266   unsigned int retries;
267
268   /**
269    * Is this socket derived from listen socket?
270    */
271   unsigned int derived;
272   
273   /**
274    * The peer identity of the peer at the other end of the stream
275    */
276   GNUNET_PEER_Id other_peer;
277
278   /**
279    * Our Peer Identity (for debugging)
280    */
281   GNUNET_PEER_Id our_id;
282
283   /**
284    * The application port number (type: uint32_t)
285    */
286   GNUNET_MESH_ApplicationType app_port;
287
288   /**
289    * The session id associated with this stream connection
290    * FIXME: Not used currently, may be removed
291    */
292   uint32_t session_id;
293
294   /**
295    * Write sequence number. Set to random when sending HELLO(client) and
296    * HELLO_ACK(server) 
297    */
298   uint32_t write_sequence_number;
299
300   /**
301    * Read sequence number. This number's value is determined during handshake
302    */
303   uint32_t read_sequence_number;
304
305   /**
306    * The receiver buffer size
307    */
308   uint32_t receive_buffer_size;
309
310   /**
311    * The receiver buffer boundaries
312    */
313   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
314
315   /**
316    * receiver's available buffer after the last acknowledged packet
317    */
318   uint32_t receiver_window_available;
319
320   /**
321    * The offset pointer used during write operation
322    */
323   uint32_t write_offset;
324
325   /**
326    * The offset after which we are expecting data
327    */
328   uint32_t read_offset;
329
330   /**
331    * The offset upto which user has read from the received buffer
332    */
333   uint32_t copy_offset;
334 };
335
336
337 /**
338  * A socket for listening
339  */
340 struct GNUNET_STREAM_ListenSocket
341 {
342   /**
343    * The mesh handle
344    */
345   struct GNUNET_MESH_Handle *mesh;
346
347   /**
348    * The callback function which is called after successful opening socket
349    */
350   GNUNET_STREAM_ListenCallback listen_cb;
351
352   /**
353    * The call back closure
354    */
355   void *listen_cb_cls;
356
357   /**
358    * Our interned Peer's identity
359    */
360   GNUNET_PEER_Id our_id;
361
362   /**
363    * The service port
364    * FIXME: Remove if not required!
365    */
366   GNUNET_MESH_ApplicationType port;
367 };
368
369
370 /**
371  * The IO Write Handle
372  */
373 struct GNUNET_STREAM_IOWriteHandle
374 {
375   /**
376    * The packet_buffers associated with this Handle
377    */
378   struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
379
380   /**
381    * The write continuation callback
382    */
383   GNUNET_STREAM_CompletionContinuation write_cont;
384
385   /**
386    * Write continuation closure
387    */
388   void *write_cont_cls;
389
390   /**
391    * The bitmap of this IOHandle; Corresponding bit for a message is set when
392    * it has been acknowledged by the receiver
393    */
394   GNUNET_STREAM_AckBitmap ack_bitmap;
395
396   /**
397    * Number of bytes in this write handle
398    */
399   size_t size;
400
401   /**
402    * Number of packets sent before waiting for an ack
403    *
404    * FIXME: Do we need this?
405    */
406   unsigned int sent_packets;
407 };
408
409
410 /**
411  * The IO Read Handle
412  */
413 struct GNUNET_STREAM_IOReadHandle
414 {
415   /**
416    * Callback for the read processor
417    */
418   GNUNET_STREAM_DataProcessor proc;
419
420   /**
421    * The closure pointer for the read processor callback
422    */
423   void *proc_cls;
424 };
425
426
427 /**
428  * Default value in seconds for various timeouts
429  */
430 static unsigned int default_timeout = 10;
431
432 /**
433  * Callback function for sending hello message
434  *
435  * @param cls closure the socket
436  * @param size number of bytes available in buf
437  * @param buf where the callee should write the message
438  * @return number of bytes written to buf
439  */
440 static size_t
441 send_message_notify (void *cls, size_t size, void *buf)
442 {
443   struct GNUNET_STREAM_Socket *socket = cls;
444   struct GNUNET_PeerIdentity target;
445   struct MessageQueue *head;
446   size_t ret;
447
448   socket->transmit_handle = NULL; /* Remove the transmit handle */
449   head = socket->queue_head;
450   if (NULL == head)
451     return 0; /* just to be safe */
452   GNUNET_PEER_resolve (socket->other_peer, &target);
453   if (0 == size)                /* request timed out */
454     {
455       socket->retries++;
456       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457                   "Message sending timed out. Retry %d \n",
458                   socket->retries);
459       socket->transmit_handle = 
460         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
461                                            0, /* Corking */
462                                            1, /* Priority */
463                                            /* FIXME: exponential backoff */
464                                            socket->retransmit_timeout,
465                                            &target,
466                                            ntohs (head->message->header.size),
467                                            &send_message_notify,
468                                            socket);
469       return 0;
470     }
471
472   ret = ntohs (head->message->header.size);
473   GNUNET_assert (size >= ret);
474   memcpy (buf, head->message, ret);
475   if (NULL != head->finish_cb)
476     {
477       head->finish_cb (head->finish_cb_cls, socket);
478     }
479   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
480                                socket->queue_tail,
481                                head);
482   GNUNET_free (head->message);
483   GNUNET_free (head);
484   head = socket->queue_head;
485   if (NULL != head)    /* more pending messages to send */
486     {
487       socket->retries = 0;
488       socket->transmit_handle = 
489         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
490                                            0, /* Corking */
491                                            1, /* Priority */
492                                            /* FIXME: exponential backoff */
493                                            socket->retransmit_timeout,
494                                            &target,
495                                            ntohs (head->message->header.size),
496                                            &send_message_notify,
497                                            socket);
498     }
499   return ret;
500 }
501
502
503 /**
504  * Queues a message for sending using the mesh connection of a socket
505  *
506  * @param socket the socket whose mesh connection is used
507  * @param message the message to be sent
508  * @param finish_cb the callback to be called when the message is sent
509  * @param finish_cb_cls the closure for the callback
510  */
511 static void
512 queue_message (struct GNUNET_STREAM_Socket *socket,
513                struct GNUNET_STREAM_MessageHeader *message,
514                SendFinishCallback finish_cb,
515                void *finish_cb_cls)
516 {
517   struct MessageQueue *queue_entity;
518   struct GNUNET_PeerIdentity target;
519
520   GNUNET_assert 
521     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
522      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
523
524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525               "%x: Queueing message of type %d and size %d\n",
526               socket->our_id,
527               ntohs (message->header.type),
528               ntohs (message->header.size));
529   GNUNET_assert (NULL != message);
530   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
531   queue_entity->message = message;
532   queue_entity->finish_cb = finish_cb;
533   queue_entity->finish_cb_cls = finish_cb_cls;
534   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
535                                     socket->queue_tail,
536                                     queue_entity);
537   if (NULL == socket->transmit_handle)
538   {
539     socket->retries = 0;
540     GNUNET_PEER_resolve (socket->other_peer, &target);
541     socket->transmit_handle = 
542       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
543                                          0, /* Corking */
544                                          1, /* Priority */
545                                          socket->retransmit_timeout,
546                                          &target,
547                                          ntohs (message->header.size),
548                                          &send_message_notify,
549                                          socket);
550   }
551 }
552
553
554 /**
555  * Copies a message and queues it for sending using the mesh connection of
556  * given socket 
557  *
558  * @param socket the socket whose mesh connection is used
559  * @param message the message to be sent
560  * @param finish_cb the callback to be called when the message is sent
561  * @param finish_cb_cls the closure for the callback
562  */
563 static void
564 copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
565                         const struct GNUNET_STREAM_MessageHeader *message,
566                         SendFinishCallback finish_cb,
567                         void *finish_cb_cls)
568 {
569   struct GNUNET_STREAM_MessageHeader *msg_copy;
570   uint16_t size;
571   
572   size = ntohs (message->header.size);
573   msg_copy = GNUNET_malloc (size);
574   memcpy (msg_copy, message, size);
575   queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
576 }
577
578
579 /**
580  * Callback function for sending ack message
581  *
582  * @param cls closure the ACK message created in ack_task
583  * @param size number of bytes available in buffer
584  * @param buf where the callee should write the message
585  * @return number of bytes written to buf
586  */
587 static size_t
588 send_ack_notify (void *cls, size_t size, void *buf)
589 {
590   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
591
592   if (0 == size)
593     {
594       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595                   "%s called with size 0\n", __func__);
596       return 0;
597     }
598   GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
599   
600   size = ntohs (ack_msg->header.header.size);
601   memcpy (buf, ack_msg, size);
602   return size;
603 }
604
605 /**
606  * Writes data using the given socket. The amount of data written is limited by
607  * the receiver_window_size
608  *
609  * @param socket the socket to use
610  */
611 static void 
612 write_data (struct GNUNET_STREAM_Socket *socket);
613
614 /**
615  * Task for retransmitting data messages if they aren't ACK before their ack
616  * deadline 
617  *
618  * @param cls the socket
619  * @param tc the Task context
620  */
621 static void
622 retransmission_timeout_task (void *cls,
623                              const struct GNUNET_SCHEDULER_TaskContext *tc)
624 {
625   struct GNUNET_STREAM_Socket *socket = cls;
626   
627   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
628     return;
629
630   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
631               "%x: Retransmitting DATA...\n", socket->our_id);
632   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
633   write_data (socket);
634 }
635
636
637 /**
638  * Task for sending ACK message
639  *
640  * @param cls the socket
641  * @param tc the Task context
642  */
643 static void
644 ack_task (void *cls,
645           const struct GNUNET_SCHEDULER_TaskContext *tc)
646 {
647   struct GNUNET_STREAM_Socket *socket = cls;
648   struct GNUNET_STREAM_AckMessage *ack_msg;
649   struct GNUNET_PeerIdentity target;
650
651   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
652     {
653       return;
654     }
655
656   socket->ack_task_id = 0;
657
658   /* Create the ACK Message */
659   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
660   ack_msg->header.header.size = htons (sizeof (struct 
661                                                GNUNET_STREAM_AckMessage));
662   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
663   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
664   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
665   ack_msg->receive_window_remaining = 
666     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
667
668   GNUNET_PEER_resolve (socket->other_peer, &target);
669   /* Request MESH for sending ACK */
670   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
671                                      0, /* Corking */
672                                      1, /* Priority */
673                                      socket->retransmit_timeout,
674                                      &target,
675                                      ntohs (ack_msg->header.header.size),
676                                      &send_ack_notify,
677                                      ack_msg);
678
679   
680 }
681
682
683 /**
684  * Function to modify a bit in GNUNET_STREAM_AckBitmap
685  *
686  * @param bitmap the bitmap to modify
687  * @param bit the bit number to modify
688  * @param value GNUNET_YES to on, GNUNET_NO to off
689  */
690 static void
691 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
692                       unsigned int bit, 
693                       int value)
694 {
695   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
696   if (GNUNET_YES == value)
697     *bitmap |= (1LL << bit);
698   else
699     *bitmap &= ~(1LL << bit);
700 }
701
702
703 /**
704  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
705  *
706  * @param bitmap address of the bitmap that has to be checked
707  * @param bit the bit number to check
708  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
709  */
710 static uint8_t
711 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
712                       unsigned int bit)
713 {
714   GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
715   return 0 != (*bitmap & (1LL << bit));
716 }
717
718
719
720 /**
721  * Function called when Data Message is sent
722  *
723  * @param cls the io_handle corresponding to the Data Message
724  * @param socket the socket which was used
725  */
726 static void
727 write_data_finish_cb (void *cls,
728                       struct GNUNET_STREAM_Socket *socket)
729 {
730   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
731
732   io_handle->sent_packets++;
733 }
734
735
736 /**
737  * Writes data using the given socket. The amount of data written is limited by
738  * the receiver_window_size
739  *
740  * @param socket the socket to use
741  */
742 static void 
743 write_data (struct GNUNET_STREAM_Socket *socket)
744 {
745   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
746   int packet;                   /* Although an int, should never be negative */
747   int ack_packet;
748
749   ack_packet = -1;
750   /* Find the last acknowledged packet */
751   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
752     {      
753       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
754                                               packet))
755         ack_packet = packet;        
756       else if (NULL == io_handle->messages[packet])
757         break;
758     }
759   /* Resend packets which weren't ack'ed */
760   for (packet=0; packet < ack_packet; packet++)
761     {
762       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
763                                              packet))
764         {
765           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766                       "%x: Placing DATA message with sequence %u in send queue\n",
767                       socket->our_id,
768                       ntohl (io_handle->messages[packet]->sequence_number));
769
770           copy_and_queue_message (socket,
771                                   &io_handle->messages[packet]->header,
772                                   NULL,
773                                   NULL);
774         }
775     }
776   packet = ack_packet + 1;
777   /* Now send new packets if there is enough buffer space */
778   while ( (NULL != io_handle->messages[packet]) &&
779           (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
780     {
781       socket->receiver_window_available -= 
782         ntohs (io_handle->messages[packet]->header.header.size);
783       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784                   "%x: Placing DATA message with sequence %u in send queue\n",
785                   socket->our_id,
786                   ntohl (io_handle->messages[packet]->sequence_number));
787       copy_and_queue_message (socket,
788                               &io_handle->messages[packet]->header,
789                               &write_data_finish_cb,
790                               io_handle);
791       packet++;
792     }
793
794   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
795     socket->retransmission_timeout_task_id = 
796       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
797                                     (GNUNET_TIME_UNIT_SECONDS, 5),
798                                     &retransmission_timeout_task,
799                                     socket);
800 }
801
802
803 /**
804  * Task for calling the read processor
805  *
806  * @param cls the socket
807  * @param tc the task context
808  */
809 static void
810 call_read_processor (void *cls,
811                           const struct GNUNET_SCHEDULER_TaskContext *tc)
812 {
813   struct GNUNET_STREAM_Socket *socket = cls;
814   size_t read_size;
815   size_t valid_read_size;
816   unsigned int packet;
817   uint32_t sequence_increase;
818   uint32_t offset_increase;
819
820   socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
821   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
822     return;
823
824   GNUNET_assert (NULL != socket->read_handle);
825   GNUNET_assert (NULL != socket->read_handle->proc);
826
827   /* Check the bitmap for any holes */
828   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
829     {
830       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
831                                              packet))
832         break;
833     }
834   /* We only call read processor if we have the first packet */
835   GNUNET_assert (0 < packet);
836
837   valid_read_size = 
838     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
839
840   GNUNET_assert (0 != valid_read_size);
841
842   /* Cancel the read_io_timeout_task */
843   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
844   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
845
846   /* Call the data processor */
847   read_size = 
848     socket->read_handle->proc (socket->read_handle->proc_cls,
849                                socket->status,
850                                socket->receive_buffer + socket->copy_offset,
851                                valid_read_size);
852   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853               "%x: Read processor completed successfully\n",
854               socket->our_id);
855
856   /* Free the read handle */
857   GNUNET_free (socket->read_handle);
858   socket->read_handle = NULL;
859
860   GNUNET_assert (read_size <= valid_read_size);
861   socket->copy_offset += read_size;
862
863   /* Determine upto which packet we can remove from the buffer */
864   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
865     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
866       break;
867
868   /* If no packets can be removed we can't move the buffer */
869   if (0 == packet) return;
870
871   sequence_increase = packet;
872
873   /* Shift the data in the receive buffer */
874   memmove (socket->receive_buffer,
875            socket->receive_buffer 
876            + socket->receive_buffer_boundaries[sequence_increase-1],
877            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
878   
879   /* Shift the bitmap */
880   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
881   
882   /* Set read_sequence_number */
883   socket->read_sequence_number += sequence_increase;
884   
885   /* Set read_offset */
886   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
887   socket->read_offset += offset_increase;
888
889   /* Fix copy_offset */
890   GNUNET_assert (offset_increase <= socket->copy_offset);
891   socket->copy_offset -= offset_increase;
892   
893   /* Fix relative boundaries */
894   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
895     {
896       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
897         {
898           socket->receive_buffer_boundaries[packet] = 
899             socket->receive_buffer_boundaries[packet + sequence_increase] 
900             - offset_increase;
901         }
902       else
903         socket->receive_buffer_boundaries[packet] = 0;
904     }
905 }
906
907
908 /**
909  * Cancels the existing read io handle
910  *
911  * @param cls the closure from the SCHEDULER call
912  * @param tc the task context
913  */
914 static void
915 read_io_timeout (void *cls, 
916                 const struct GNUNET_SCHEDULER_TaskContext *tc)
917 {
918   struct GNUNET_STREAM_Socket *socket = cls;
919
920   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
921   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
922   {
923     GNUNET_SCHEDULER_cancel (socket->read_task_id);
924     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
925   }
926   GNUNET_assert (NULL != socket->read_handle);
927   
928   GNUNET_free (socket->read_handle);
929   socket->read_handle = NULL;
930 }
931
932
933 /**
934  * Handler for DATA messages; Same for both client and server
935  *
936  * @param socket the socket through which the ack was received
937  * @param tunnel connection to the other end
938  * @param sender who sent the message
939  * @param msg the data message
940  * @param atsi performance data for the connection
941  * @return GNUNET_OK to keep the connection open,
942  *         GNUNET_SYSERR to close it (signal serious error)
943  */
944 static int
945 handle_data (struct GNUNET_STREAM_Socket *socket,
946              struct GNUNET_MESH_Tunnel *tunnel,
947              const struct GNUNET_PeerIdentity *sender,
948              const struct GNUNET_STREAM_DataMessage *msg,
949              const struct GNUNET_ATS_Information*atsi)
950 {
951   const void *payload;
952   uint32_t bytes_needed;
953   uint32_t relative_offset;
954   uint32_t relative_sequence_number;
955   uint16_t size;
956
957   size = htons (msg->header.header.size);
958   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
959     {
960       GNUNET_break_op (0);
961       return GNUNET_SYSERR;
962     }
963
964   if (GNUNET_PEER_search (sender) != socket->other_peer)
965     {
966       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967                   "%x: Received DATA from non-confirming peer\n",
968                   socket->our_id);
969       return GNUNET_YES;
970     }
971
972   switch (socket->state)
973     {
974     case STATE_ESTABLISHED:
975     case STATE_TRANSMIT_CLOSED:
976     case STATE_TRANSMIT_CLOSE_WAIT:
977       
978       /* check if the message's sequence number is in the range we are
979          expecting */
980       relative_sequence_number = 
981         ntohl (msg->sequence_number) - socket->read_sequence_number;
982       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
983         {
984           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                       "%x: Ignoring received message with sequence number %u\n",
986                       socket->our_id,
987                       ntohl (msg->sequence_number));
988           return GNUNET_YES;
989         }
990
991       /* Check if we have already seen this message */
992       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
993                                               relative_sequence_number))
994         {
995           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996                       "%x: Ignoring already received message with sequence "
997                       "number %u\n",
998                       socket->our_id,
999                       ntohl (msg->sequence_number));
1000           return GNUNET_YES;
1001         }
1002
1003       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004                   "%x: Receiving DATA with sequence number: %u and size: %d "
1005                   "from %x\n",
1006                   socket->our_id,
1007                   ntohl (msg->sequence_number),
1008                   ntohs (msg->header.header.size),
1009                   socket->other_peer);
1010
1011       /* Check if we have to allocate the buffer */
1012       size -= sizeof (struct GNUNET_STREAM_DataMessage);
1013       relative_offset = ntohl (msg->offset) - socket->read_offset;
1014       bytes_needed = relative_offset + size;
1015       if (bytes_needed > socket->receive_buffer_size)
1016         {
1017           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1018             {
1019               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1020                                                        bytes_needed);
1021               socket->receive_buffer_size = bytes_needed;
1022             }
1023           else
1024             {
1025               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026                           "%x: Cannot accommodate packet %d as buffer is",
1027                           "full\n",
1028                           socket->our_id,
1029                           ntohl (msg->sequence_number));
1030               return GNUNET_YES;
1031             }
1032         }
1033       
1034       /* Copy Data to buffer */
1035       payload = &msg[1];
1036       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1037       memcpy (socket->receive_buffer + relative_offset,
1038               payload,
1039               size);
1040       socket->receive_buffer_boundaries[relative_sequence_number] = 
1041         relative_offset + size;
1042       
1043       /* Modify the ACK bitmap */
1044       ackbitmap_modify_bit (&socket->ack_bitmap,
1045                             relative_sequence_number,
1046                             GNUNET_YES);
1047
1048       /* Start ACK sending task if one is not already present */
1049       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1050        {
1051          socket->ack_task_id = 
1052            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1053                                          (msg->ack_deadline),
1054                                          &ack_task,
1055                                          socket);
1056        }
1057
1058       if ((NULL != socket->read_handle) /* A read handle is waiting */
1059           /* There is no current read task */
1060           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1061           /* We have the first packet */
1062           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1063                                                  0)))
1064         {
1065           socket->read_task_id = 
1066             GNUNET_SCHEDULER_add_now (&call_read_processor,
1067                                       socket);
1068         }
1069       
1070       break;
1071
1072     default:
1073       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074                   "%x: Received data message when it cannot be handled\n",
1075                   socket->our_id);
1076       break;
1077     }
1078   return GNUNET_YES;
1079 }
1080
1081 /**
1082  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1083  *
1084  * @param cls the socket (set from GNUNET_MESH_connect)
1085  * @param tunnel connection to the other end
1086  * @param tunnel_ctx place to store local state associated with the tunnel
1087  * @param sender who sent the message
1088  * @param message the actual message
1089  * @param atsi performance data for the connection
1090  * @return GNUNET_OK to keep the connection open,
1091  *         GNUNET_SYSERR to close it (signal serious error)
1092  */
1093 static int
1094 client_handle_data (void *cls,
1095              struct GNUNET_MESH_Tunnel *tunnel,
1096              void **tunnel_ctx,
1097              const struct GNUNET_PeerIdentity *sender,
1098              const struct GNUNET_MessageHeader *message,
1099              const struct GNUNET_ATS_Information*atsi)
1100 {
1101   struct GNUNET_STREAM_Socket *socket = cls;
1102
1103   return handle_data (socket, 
1104                       tunnel, 
1105                       sender, 
1106                       (const struct GNUNET_STREAM_DataMessage *) message, 
1107                       atsi);
1108 }
1109
1110
1111 /**
1112  * Callback to set state to ESTABLISHED
1113  *
1114  * @param cls the closure from queue_message FIXME: document
1115  * @param socket the socket to requiring state change
1116  */
1117 static void
1118 set_state_established (void *cls,
1119                        struct GNUNET_STREAM_Socket *socket)
1120 {
1121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1122               "%x: Attaining ESTABLISHED state\n",
1123               socket->our_id);
1124   socket->write_offset = 0;
1125   socket->read_offset = 0;
1126   socket->state = STATE_ESTABLISHED;
1127   if (socket->open_cb)
1128     socket->open_cb (socket->open_cls, socket);
1129 }
1130
1131
1132 /**
1133  * Callback to set state to HELLO_WAIT
1134  *
1135  * @param cls the closure from queue_message
1136  * @param socket the socket to requiring state change
1137  */
1138 static void
1139 set_state_hello_wait (void *cls,
1140                       struct GNUNET_STREAM_Socket *socket)
1141 {
1142   GNUNET_assert (STATE_INIT == socket->state);
1143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1144               "%x: Attaining HELLO_WAIT state\n",
1145               socket->our_id);
1146   socket->state = STATE_HELLO_WAIT;
1147 }
1148
1149
1150 /**
1151  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1152  *
1153  * @param cls the socket (set from GNUNET_MESH_connect)
1154  * @param tunnel connection to the other end
1155  * @param tunnel_ctx this is NULL
1156  * @param sender who sent the message
1157  * @param message the actual message
1158  * @param atsi performance data for the connection
1159  * @return GNUNET_OK to keep the connection open,
1160  *         GNUNET_SYSERR to close it (signal serious error)
1161  */
1162 static int
1163 client_handle_hello_ack (void *cls,
1164                          struct GNUNET_MESH_Tunnel *tunnel,
1165                          void **tunnel_ctx,
1166                          const struct GNUNET_PeerIdentity *sender,
1167                          const struct GNUNET_MessageHeader *message,
1168                          const struct GNUNET_ATS_Information*atsi)
1169 {
1170   struct GNUNET_STREAM_Socket *socket = cls;
1171   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1172   struct GNUNET_STREAM_HelloAckMessage *reply;
1173
1174   if (GNUNET_PEER_search (sender) != socket->other_peer)
1175     {
1176       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177                   "%x: Received HELLO_ACK from non-confirming peer\n",
1178                   socket->our_id);
1179       return GNUNET_YES;
1180     }
1181   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1182   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1183               "%x: Received HELLO_ACK from %x\n",
1184               socket->our_id,
1185               socket->other_peer);
1186
1187   GNUNET_assert (socket->tunnel == tunnel);
1188   switch (socket->state)
1189   {
1190   case STATE_HELLO_WAIT:
1191     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1192     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193                 "%x: Read sequence number %u\n",
1194                 socket->our_id,
1195                 (unsigned int) socket->read_sequence_number);
1196     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1197     /* Get the random sequence number */
1198     socket->write_sequence_number = 
1199       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1200       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201                   "%x: Generated write sequence number %u\n",
1202                   socket->our_id,
1203                   (unsigned int) socket->write_sequence_number);
1204     reply = 
1205       GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1206     reply->header.header.size = 
1207       htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1208     reply->header.header.type = 
1209       htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1210     reply->sequence_number = htonl (socket->write_sequence_number);
1211     reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1212     queue_message (socket, 
1213                    &reply->header, 
1214                    &set_state_established, 
1215                    NULL);      
1216     return GNUNET_OK;
1217   case STATE_ESTABLISHED:
1218   case STATE_RECEIVE_CLOSE_WAIT:
1219     // call statistics (# ACKs ignored++)
1220     return GNUNET_OK;
1221   case STATE_INIT:
1222   default:
1223     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224                 "%x: Server %x sent HELLO_ACK when in state %d\n", 
1225                 socket->our_id,
1226                 socket->other_peer,
1227                 socket->state);
1228     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1229     return GNUNET_SYSERR;
1230   }
1231
1232 }
1233
1234
1235 /**
1236  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1237  *
1238  * @param cls the socket (set from GNUNET_MESH_connect)
1239  * @param tunnel connection to the other end
1240  * @param tunnel_ctx this is NULL
1241  * @param sender who sent the message
1242  * @param message the actual message
1243  * @param atsi performance data for the connection
1244  * @return GNUNET_OK to keep the connection open,
1245  *         GNUNET_SYSERR to close it (signal serious error)
1246  */
1247 static int
1248 client_handle_reset (void *cls,
1249                      struct GNUNET_MESH_Tunnel *tunnel,
1250                      void **tunnel_ctx,
1251                      const struct GNUNET_PeerIdentity *sender,
1252                      const struct GNUNET_MessageHeader *message,
1253                      const struct GNUNET_ATS_Information*atsi)
1254 {
1255   struct GNUNET_STREAM_Socket *socket = cls;
1256
1257   return GNUNET_OK;
1258 }
1259
1260
1261 /**
1262  * Common message handler for handling TRANSMIT_CLOSE messages
1263  *
1264  * @param socket the socket through which the ack was received
1265  * @param tunnel connection to the other end
1266  * @param sender who sent the message
1267  * @param msg the transmit close message
1268  * @param atsi performance data for the connection
1269  * @return GNUNET_OK to keep the connection open,
1270  *         GNUNET_SYSERR to close it (signal serious error)
1271  */
1272 static int
1273 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1274                        struct GNUNET_MESH_Tunnel *tunnel,
1275                        const struct GNUNET_PeerIdentity *sender,
1276                        const struct GNUNET_STREAM_MessageHeader *msg,
1277                        const struct GNUNET_ATS_Information*atsi)
1278 {
1279   struct GNUNET_STREAM_MessageHeader *reply;
1280
1281   switch (socket->state)
1282     {
1283     case STATE_ESTABLISHED:
1284       socket->state = STATE_RECEIVE_CLOSED;
1285
1286       /* Send TRANSMIT_CLOSE_ACK */
1287       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1288       reply->header.type = 
1289         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1290       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1291       queue_message (socket, reply, NULL, NULL);
1292       break;
1293
1294     default:
1295       /* FIXME: Call statistics? */
1296       break;
1297     }
1298   return GNUNET_YES;
1299 }
1300
1301
1302 /**
1303  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1304  *
1305  * @param cls the socket (set from GNUNET_MESH_connect)
1306  * @param tunnel connection to the other end
1307  * @param tunnel_ctx this is NULL
1308  * @param sender who sent the message
1309  * @param message the actual message
1310  * @param atsi performance data for the connection
1311  * @return GNUNET_OK to keep the connection open,
1312  *         GNUNET_SYSERR to close it (signal serious error)
1313  */
1314 static int
1315 client_handle_transmit_close (void *cls,
1316                               struct GNUNET_MESH_Tunnel *tunnel,
1317                               void **tunnel_ctx,
1318                               const struct GNUNET_PeerIdentity *sender,
1319                               const struct GNUNET_MessageHeader *message,
1320                               const struct GNUNET_ATS_Information*atsi)
1321 {
1322   struct GNUNET_STREAM_Socket *socket = cls;
1323   
1324   return handle_transmit_close (socket,
1325                                 tunnel,
1326                                 sender,
1327                                 (struct GNUNET_STREAM_MessageHeader *)message,
1328                                 atsi);
1329 }
1330
1331
1332 /**
1333  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1334  *
1335  * @param cls the socket (set from GNUNET_MESH_connect)
1336  * @param tunnel connection to the other end
1337  * @param tunnel_ctx this is NULL
1338  * @param sender who sent the message
1339  * @param message the actual message
1340  * @param atsi performance data for the connection
1341  * @return GNUNET_OK to keep the connection open,
1342  *         GNUNET_SYSERR to close it (signal serious error)
1343  */
1344 static int
1345 client_handle_transmit_close_ack (void *cls,
1346                                   struct GNUNET_MESH_Tunnel *tunnel,
1347                                   void **tunnel_ctx,
1348                                   const struct GNUNET_PeerIdentity *sender,
1349                                   const struct GNUNET_MessageHeader *message,
1350                                   const struct GNUNET_ATS_Information*atsi)
1351 {
1352   struct GNUNET_STREAM_Socket *socket = cls;
1353
1354   return GNUNET_OK;
1355 }
1356
1357
1358 /**
1359  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1360  *
1361  * @param cls the socket (set from GNUNET_MESH_connect)
1362  * @param tunnel connection to the other end
1363  * @param tunnel_ctx this is NULL
1364  * @param sender who sent the message
1365  * @param message the actual message
1366  * @param atsi performance data for the connection
1367  * @return GNUNET_OK to keep the connection open,
1368  *         GNUNET_SYSERR to close it (signal serious error)
1369  */
1370 static int
1371 client_handle_receive_close (void *cls,
1372                              struct GNUNET_MESH_Tunnel *tunnel,
1373                              void **tunnel_ctx,
1374                              const struct GNUNET_PeerIdentity *sender,
1375                              const struct GNUNET_MessageHeader *message,
1376                              const struct GNUNET_ATS_Information*atsi)
1377 {
1378   struct GNUNET_STREAM_Socket *socket = cls;
1379
1380   return GNUNET_OK;
1381 }
1382
1383
1384 /**
1385  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1386  *
1387  * @param cls the socket (set from GNUNET_MESH_connect)
1388  * @param tunnel connection to the other end
1389  * @param tunnel_ctx this is NULL
1390  * @param sender who sent the message
1391  * @param message the actual message
1392  * @param atsi performance data for the connection
1393  * @return GNUNET_OK to keep the connection open,
1394  *         GNUNET_SYSERR to close it (signal serious error)
1395  */
1396 static int
1397 client_handle_receive_close_ack (void *cls,
1398                                  struct GNUNET_MESH_Tunnel *tunnel,
1399                                  void **tunnel_ctx,
1400                                  const struct GNUNET_PeerIdentity *sender,
1401                                  const struct GNUNET_MessageHeader *message,
1402                                  const struct GNUNET_ATS_Information*atsi)
1403 {
1404   struct GNUNET_STREAM_Socket *socket = cls;
1405
1406   return GNUNET_OK;
1407 }
1408
1409
1410 /**
1411  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1412  *
1413  * @param cls the socket (set from GNUNET_MESH_connect)
1414  * @param tunnel connection to the other end
1415  * @param tunnel_ctx this is NULL
1416  * @param sender who sent the message
1417  * @param message the actual message
1418  * @param atsi performance data for the connection
1419  * @return GNUNET_OK to keep the connection open,
1420  *         GNUNET_SYSERR to close it (signal serious error)
1421  */
1422 static int
1423 client_handle_close (void *cls,
1424                      struct GNUNET_MESH_Tunnel *tunnel,
1425                      void **tunnel_ctx,
1426                      const struct GNUNET_PeerIdentity *sender,
1427                      const struct GNUNET_MessageHeader *message,
1428                      const struct GNUNET_ATS_Information*atsi)
1429 {
1430   struct GNUNET_STREAM_Socket *socket = cls;
1431
1432   return GNUNET_OK;
1433 }
1434
1435
1436 /**
1437  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1438  *
1439  * @param cls the socket (set from GNUNET_MESH_connect)
1440  * @param tunnel connection to the other end
1441  * @param tunnel_ctx this is NULL
1442  * @param sender who sent the message
1443  * @param message the actual message
1444  * @param atsi performance data for the connection
1445  * @return GNUNET_OK to keep the connection open,
1446  *         GNUNET_SYSERR to close it (signal serious error)
1447  */
1448 static int
1449 client_handle_close_ack (void *cls,
1450                          struct GNUNET_MESH_Tunnel *tunnel,
1451                          void **tunnel_ctx,
1452                          const struct GNUNET_PeerIdentity *sender,
1453                          const struct GNUNET_MessageHeader *message,
1454                          const struct GNUNET_ATS_Information*atsi)
1455 {
1456   struct GNUNET_STREAM_Socket *socket = cls;
1457
1458   return GNUNET_OK;
1459 }
1460
1461 /*****************************/
1462 /* Server's Message Handlers */
1463 /*****************************/
1464
1465 /**
1466  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1467  *
1468  * @param cls the closure
1469  * @param tunnel connection to the other end
1470  * @param tunnel_ctx the socket
1471  * @param sender who sent the message
1472  * @param message the actual message
1473  * @param atsi performance data for the connection
1474  * @return GNUNET_OK to keep the connection open,
1475  *         GNUNET_SYSERR to close it (signal serious error)
1476  */
1477 static int
1478 server_handle_data (void *cls,
1479                     struct GNUNET_MESH_Tunnel *tunnel,
1480                     void **tunnel_ctx,
1481                     const struct GNUNET_PeerIdentity *sender,
1482                     const struct GNUNET_MessageHeader *message,
1483                     const struct GNUNET_ATS_Information*atsi)
1484 {
1485   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1486
1487   return handle_data (socket,
1488                       tunnel,
1489                       sender,
1490                       (const struct GNUNET_STREAM_DataMessage *)message,
1491                       atsi);
1492 }
1493
1494
1495 /**
1496  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1497  *
1498  * @param cls the closure
1499  * @param tunnel connection to the other end
1500  * @param tunnel_ctx the socket
1501  * @param sender who sent the message
1502  * @param message the actual message
1503  * @param atsi performance data for the connection
1504  * @return GNUNET_OK to keep the connection open,
1505  *         GNUNET_SYSERR to close it (signal serious error)
1506  */
1507 static int
1508 server_handle_hello (void *cls,
1509                      struct GNUNET_MESH_Tunnel *tunnel,
1510                      void **tunnel_ctx,
1511                      const struct GNUNET_PeerIdentity *sender,
1512                      const struct GNUNET_MessageHeader *message,
1513                      const struct GNUNET_ATS_Information*atsi)
1514 {
1515   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1516   struct GNUNET_STREAM_HelloAckMessage *reply;
1517
1518   if (GNUNET_PEER_search (sender) != socket->other_peer)
1519     {
1520       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1521                   "%x: Received HELLO from non-confirming peer\n",
1522                   socket->our_id);
1523       return GNUNET_YES;
1524     }
1525
1526   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
1527                  ntohs (message->type));
1528   GNUNET_assert (socket->tunnel == tunnel);
1529   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530               "%x: Received HELLO from %x\n", 
1531               socket->our_id,
1532               socket->other_peer);
1533
1534   if (STATE_INIT == socket->state)
1535     {
1536       /* Get the random sequence number */
1537       socket->write_sequence_number = 
1538         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1539       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540                   "%x: Generated write sequence number %u\n",
1541                   socket->our_id,
1542                   (unsigned int) socket->write_sequence_number);
1543       reply = 
1544         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1545       reply->header.header.size = 
1546         htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1547       reply->header.header.type = 
1548         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1549       reply->sequence_number = htonl (socket->write_sequence_number);
1550       reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1551       queue_message (socket, 
1552                      &reply->header,
1553                      &set_state_hello_wait, 
1554                      NULL);
1555     }
1556   else
1557     {
1558       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1559                   "Client sent HELLO when in state %d\n", socket->state);
1560       /* FIXME: Send RESET? */
1561       
1562     }
1563   return GNUNET_OK;
1564 }
1565
1566
1567 /**
1568  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1569  *
1570  * @param cls the closure
1571  * @param tunnel connection to the other end
1572  * @param tunnel_ctx the socket
1573  * @param sender who sent the message
1574  * @param message the actual message
1575  * @param atsi performance data for the connection
1576  * @return GNUNET_OK to keep the connection open,
1577  *         GNUNET_SYSERR to close it (signal serious error)
1578  */
1579 static int
1580 server_handle_hello_ack (void *cls,
1581                          struct GNUNET_MESH_Tunnel *tunnel,
1582                          void **tunnel_ctx,
1583                          const struct GNUNET_PeerIdentity *sender,
1584                          const struct GNUNET_MessageHeader *message,
1585                          const struct GNUNET_ATS_Information*atsi)
1586 {
1587   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1588   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1589
1590   GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
1591                  ntohs (message->type));
1592   GNUNET_assert (socket->tunnel == tunnel);
1593   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1594   if (STATE_HELLO_WAIT == socket->state)
1595     {
1596       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1597                   "%x: Received HELLO_ACK from %x\n",
1598                   socket->our_id,
1599                   socket->other_peer);
1600       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1602                   "%x: Read sequence number %u\n",
1603                   socket->our_id,
1604                   (unsigned int) socket->read_sequence_number);
1605       socket->receiver_window_available = 
1606         ntohl (ack_message->receiver_window_size);
1607       /* Attain ESTABLISHED state */
1608       set_state_established (NULL, socket);
1609     }
1610   else
1611     {
1612       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1613                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1614       /* FIXME: Send RESET? */
1615       
1616     }
1617   return GNUNET_OK;
1618 }
1619
1620
1621 /**
1622  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1623  *
1624  * @param cls the closure
1625  * @param tunnel connection to the other end
1626  * @param tunnel_ctx the socket
1627  * @param sender who sent the message
1628  * @param message the actual message
1629  * @param atsi performance data for the connection
1630  * @return GNUNET_OK to keep the connection open,
1631  *         GNUNET_SYSERR to close it (signal serious error)
1632  */
1633 static int
1634 server_handle_reset (void *cls,
1635                      struct GNUNET_MESH_Tunnel *tunnel,
1636                      void **tunnel_ctx,
1637                      const struct GNUNET_PeerIdentity *sender,
1638                      const struct GNUNET_MessageHeader *message,
1639                      const struct GNUNET_ATS_Information*atsi)
1640 {
1641   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1642
1643   return GNUNET_OK;
1644 }
1645
1646
1647 /**
1648  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1649  *
1650  * @param cls the closure
1651  * @param tunnel connection to the other end
1652  * @param tunnel_ctx the socket
1653  * @param sender who sent the message
1654  * @param message the actual message
1655  * @param atsi performance data for the connection
1656  * @return GNUNET_OK to keep the connection open,
1657  *         GNUNET_SYSERR to close it (signal serious error)
1658  */
1659 static int
1660 server_handle_transmit_close (void *cls,
1661                               struct GNUNET_MESH_Tunnel *tunnel,
1662                               void **tunnel_ctx,
1663                               const struct GNUNET_PeerIdentity *sender,
1664                               const struct GNUNET_MessageHeader *message,
1665                               const struct GNUNET_ATS_Information*atsi)
1666 {
1667   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1668
1669   return handle_transmit_close (socket,
1670                                 tunnel,
1671                                 sender,
1672                                 (struct GNUNET_STREAM_MessageHeader *)message,
1673                                 atsi);
1674 }
1675
1676
1677 /**
1678  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1679  *
1680  * @param cls the closure
1681  * @param tunnel connection to the other end
1682  * @param tunnel_ctx the socket
1683  * @param sender who sent the message
1684  * @param message the actual message
1685  * @param atsi performance data for the connection
1686  * @return GNUNET_OK to keep the connection open,
1687  *         GNUNET_SYSERR to close it (signal serious error)
1688  */
1689 static int
1690 server_handle_transmit_close_ack (void *cls,
1691                                   struct GNUNET_MESH_Tunnel *tunnel,
1692                                   void **tunnel_ctx,
1693                                   const struct GNUNET_PeerIdentity *sender,
1694                                   const struct GNUNET_MessageHeader *message,
1695                                   const struct GNUNET_ATS_Information*atsi)
1696 {
1697   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1698
1699   return GNUNET_OK;
1700 }
1701
1702
1703 /**
1704  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1705  *
1706  * @param cls the closure
1707  * @param tunnel connection to the other end
1708  * @param tunnel_ctx the socket
1709  * @param sender who sent the message
1710  * @param message the actual message
1711  * @param atsi performance data for the connection
1712  * @return GNUNET_OK to keep the connection open,
1713  *         GNUNET_SYSERR to close it (signal serious error)
1714  */
1715 static int
1716 server_handle_receive_close (void *cls,
1717                              struct GNUNET_MESH_Tunnel *tunnel,
1718                              void **tunnel_ctx,
1719                              const struct GNUNET_PeerIdentity *sender,
1720                              const struct GNUNET_MessageHeader *message,
1721                              const struct GNUNET_ATS_Information*atsi)
1722 {
1723   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1724
1725   return GNUNET_OK;
1726 }
1727
1728
1729 /**
1730  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1731  *
1732  * @param cls the closure
1733  * @param tunnel connection to the other end
1734  * @param tunnel_ctx the socket
1735  * @param sender who sent the message
1736  * @param message the actual message
1737  * @param atsi performance data for the connection
1738  * @return GNUNET_OK to keep the connection open,
1739  *         GNUNET_SYSERR to close it (signal serious error)
1740  */
1741 static int
1742 server_handle_receive_close_ack (void *cls,
1743                                  struct GNUNET_MESH_Tunnel *tunnel,
1744                                  void **tunnel_ctx,
1745                                  const struct GNUNET_PeerIdentity *sender,
1746                                  const struct GNUNET_MessageHeader *message,
1747                                  const struct GNUNET_ATS_Information*atsi)
1748 {
1749   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1750
1751   return GNUNET_OK;
1752 }
1753
1754
1755 /**
1756  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1757  *
1758  * @param cls the closure
1759  * @param tunnel connection to the other end
1760  * @param tunnel_ctx the socket
1761  * @param sender who sent the message
1762  * @param message the actual message
1763  * @param atsi performance data for the connection
1764  * @return GNUNET_OK to keep the connection open,
1765  *         GNUNET_SYSERR to close it (signal serious error)
1766  */
1767 static int
1768 server_handle_close (void *cls,
1769                      struct GNUNET_MESH_Tunnel *tunnel,
1770                      void **tunnel_ctx,
1771                      const struct GNUNET_PeerIdentity *sender,
1772                      const struct GNUNET_MessageHeader *message,
1773                      const struct GNUNET_ATS_Information*atsi)
1774 {
1775   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1776
1777   return GNUNET_OK;
1778 }
1779
1780
1781 /**
1782  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1783  *
1784  * @param cls the closure
1785  * @param tunnel connection to the other end
1786  * @param tunnel_ctx the socket
1787  * @param sender who sent the message
1788  * @param message the actual message
1789  * @param atsi performance data for the connection
1790  * @return GNUNET_OK to keep the connection open,
1791  *         GNUNET_SYSERR to close it (signal serious error)
1792  */
1793 static int
1794 server_handle_close_ack (void *cls,
1795                          struct GNUNET_MESH_Tunnel *tunnel,
1796                          void **tunnel_ctx,
1797                          const struct GNUNET_PeerIdentity *sender,
1798                          const struct GNUNET_MessageHeader *message,
1799                          const struct GNUNET_ATS_Information*atsi)
1800 {
1801   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1802
1803   return GNUNET_OK;
1804 }
1805
1806
1807 /**
1808  * Message Handler for mesh
1809  *
1810  * @param socket the socket through which the ack was received
1811  * @param tunnel connection to the other end
1812  * @param sender who sent the message
1813  * @param ack the acknowledgment message
1814  * @param atsi performance data for the connection
1815  * @return GNUNET_OK to keep the connection open,
1816  *         GNUNET_SYSERR to close it (signal serious error)
1817  */
1818 static int
1819 handle_ack (struct GNUNET_STREAM_Socket *socket,
1820             struct GNUNET_MESH_Tunnel *tunnel,
1821             const struct GNUNET_PeerIdentity *sender,
1822             const struct GNUNET_STREAM_AckMessage *ack,
1823             const struct GNUNET_ATS_Information*atsi)
1824 {
1825   unsigned int packet;
1826   int need_retransmission;
1827
1828   if (GNUNET_PEER_search (sender) != socket->other_peer)
1829     {
1830       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1831                   "%x: Received ACK from non-confirming peer\n",
1832                   socket->our_id);
1833       return GNUNET_YES;
1834     }
1835
1836   switch (socket->state)
1837     {
1838     case (STATE_ESTABLISHED):
1839       if (NULL == socket->write_handle)
1840         {
1841           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1842                       "%x: Received DATA_ACK when write_handle is NULL\n",
1843                       socket->our_id);
1844           return GNUNET_OK;
1845         }
1846       
1847       if (!((socket->write_sequence_number 
1848              - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1849         {
1850           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1851                       "%x: Received DATA_ACK with unexpected base sequence",
1852                       "number\n",
1853                       socket->our_id);
1854           return GNUNET_OK;
1855         }
1856       /* FIXME: include the case when write_handle is cancelled - ignore the 
1857          acks */
1858
1859       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1860                   "%x: Received DATA_ACK from %x\n",
1861                   socket->our_id,
1862                   socket->other_peer);
1863       
1864       /* Cancel the retransmission task */
1865       if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
1866         {
1867           GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
1868           socket->retransmission_timeout_task_id = 
1869             GNUNET_SCHEDULER_NO_TASK;
1870         }
1871
1872       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1873       socket->receiver_window_available = 
1874         ntohl (ack->receive_window_remaining);
1875
1876       /* Check if we have received all acknowledgements */
1877       need_retransmission = GNUNET_NO;
1878       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1879         {
1880           if (NULL == socket->write_handle->messages[packet]) break;
1881           if (GNUNET_YES != ackbitmap_is_bit_set 
1882               (&socket->write_handle->ack_bitmap,packet))
1883             {
1884               need_retransmission = GNUNET_YES;
1885               break;
1886             }
1887         }
1888       if (GNUNET_YES == need_retransmission)
1889         {
1890           write_data (socket);
1891         }
1892       else      /* We have to call the write continuation callback now */
1893         {
1894           /* Free the packets */
1895           for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1896             {
1897               GNUNET_free_non_null (socket->write_handle->messages[packet]);
1898             }
1899           if (NULL != socket->write_handle->write_cont)
1900             socket->write_handle->write_cont
1901               (socket->write_handle->write_cont_cls,
1902                socket->status,
1903                socket->write_handle->size);
1904           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1905                       "%x: Write completion callback completed\n",
1906                       socket->our_id);
1907           /* We are done with the write handle - Freeing it */
1908           GNUNET_free (socket->write_handle);
1909           socket->write_handle = NULL;
1910         }
1911       break;
1912     default:
1913       break;
1914     }
1915   return GNUNET_OK;
1916 }
1917
1918
1919 /**
1920  * Message Handler for mesh
1921  *
1922  * @param cls the 'struct GNUNET_STREAM_Socket'
1923  * @param tunnel connection to the other end
1924  * @param tunnel_ctx unused
1925  * @param sender who sent the message
1926  * @param message the actual message
1927  * @param atsi performance data for the connection
1928  * @return GNUNET_OK to keep the connection open,
1929  *         GNUNET_SYSERR to close it (signal serious error)
1930  */
1931 static int
1932 client_handle_ack (void *cls,
1933                    struct GNUNET_MESH_Tunnel *tunnel,
1934                    void **tunnel_ctx,
1935                    const struct GNUNET_PeerIdentity *sender,
1936                    const struct GNUNET_MessageHeader *message,
1937                    const struct GNUNET_ATS_Information*atsi)
1938 {
1939   struct GNUNET_STREAM_Socket *socket = cls;
1940   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1941  
1942   return handle_ack (socket, tunnel, sender, ack, atsi);
1943 }
1944
1945
1946 /**
1947  * Message Handler for mesh
1948  *
1949  * @param cls the server's listen socket
1950  * @param tunnel connection to the other end
1951  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1952  * @param sender who sent the message
1953  * @param message the actual message
1954  * @param atsi performance data for the connection
1955  * @return GNUNET_OK to keep the connection open,
1956  *         GNUNET_SYSERR to close it (signal serious error)
1957  */
1958 static int
1959 server_handle_ack (void *cls,
1960                    struct GNUNET_MESH_Tunnel *tunnel,
1961                    void **tunnel_ctx,
1962                    const struct GNUNET_PeerIdentity *sender,
1963                    const struct GNUNET_MessageHeader *message,
1964                    const struct GNUNET_ATS_Information*atsi)
1965 {
1966   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1967   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1968  
1969   return handle_ack (socket, tunnel, sender, ack, atsi);
1970 }
1971
1972
1973 /**
1974  * For client message handlers, the stream socket is in the
1975  * closure argument.
1976  */
1977 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1978   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1979   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1980    sizeof (struct GNUNET_STREAM_AckMessage) },
1981   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1982    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1983   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1984    sizeof (struct GNUNET_STREAM_MessageHeader)},
1985   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1986    sizeof (struct GNUNET_STREAM_MessageHeader)},
1987   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1988    sizeof (struct GNUNET_STREAM_MessageHeader)},
1989   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1990    sizeof (struct GNUNET_STREAM_MessageHeader)},
1991   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1992    sizeof (struct GNUNET_STREAM_MessageHeader)},
1993   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1994    sizeof (struct GNUNET_STREAM_MessageHeader)},
1995   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1996    sizeof (struct GNUNET_STREAM_MessageHeader)},
1997   {NULL, 0, 0}
1998 };
1999
2000
2001 /**
2002  * For server message handlers, the stream socket is in the
2003  * tunnel context, and the listen socket in the closure argument.
2004  */
2005 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2006   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2007   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
2008    sizeof (struct GNUNET_STREAM_AckMessage) },
2009   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
2010    sizeof (struct GNUNET_STREAM_MessageHeader)},
2011   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2012    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2013   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2014    sizeof (struct GNUNET_STREAM_MessageHeader)},
2015   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2016    sizeof (struct GNUNET_STREAM_MessageHeader)},
2017   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2018    sizeof (struct GNUNET_STREAM_MessageHeader)},
2019   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2020    sizeof (struct GNUNET_STREAM_MessageHeader)},
2021   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2022    sizeof (struct GNUNET_STREAM_MessageHeader)},
2023   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2024    sizeof (struct GNUNET_STREAM_MessageHeader)},
2025   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2026    sizeof (struct GNUNET_STREAM_MessageHeader)},
2027   {NULL, 0, 0}
2028 };
2029
2030
2031 /**
2032  * Function called when our target peer is connected to our tunnel
2033  *
2034  * @param cls the socket for which this tunnel is created
2035  * @param peer the peer identity of the target
2036  * @param atsi performance data for the connection
2037  */
2038 static void
2039 mesh_peer_connect_callback (void *cls,
2040                             const struct GNUNET_PeerIdentity *peer,
2041                             const struct GNUNET_ATS_Information * atsi)
2042 {
2043   struct GNUNET_STREAM_Socket *socket = cls;
2044   struct GNUNET_STREAM_MessageHeader *message;
2045   GNUNET_PEER_Id connected_peer;
2046
2047   connected_peer = GNUNET_PEER_search (peer);
2048   
2049   if (connected_peer != socket->other_peer)
2050     {
2051       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052                   "%x: A peer which is not our target has connected",
2053                   "to our tunnel\n",
2054                   socket->our_id);
2055       return;
2056     }
2057   
2058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2059               "%x: Target peer %x connected\n", 
2060               socket->our_id,
2061               connected_peer);
2062   
2063   /* Set state to INIT */
2064   socket->state = STATE_INIT;
2065
2066   /* Send HELLO message */
2067   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2068   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2069   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2070   queue_message (socket,
2071                  message,
2072                  &set_state_hello_wait,
2073                  NULL);
2074
2075   /* Call open callback */
2076   if (NULL == socket->open_cb)
2077     {
2078       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2079                   "STREAM_open callback is NULL\n");
2080     }
2081 }
2082
2083
2084 /**
2085  * Function called when our target peer is disconnected from our tunnel
2086  *
2087  * @param cls the socket associated which this tunnel
2088  * @param peer the peer identity of the target
2089  */
2090 static void
2091 mesh_peer_disconnect_callback (void *cls,
2092                                const struct GNUNET_PeerIdentity *peer)
2093 {
2094
2095 }
2096
2097
2098 /**
2099  * Method called whenever a peer creates a tunnel to us
2100  *
2101  * @param cls closure
2102  * @param tunnel new handle to the tunnel
2103  * @param initiator peer that started the tunnel
2104  * @param atsi performance information for the tunnel
2105  * @return initial tunnel context for the tunnel
2106  *         (can be NULL -- that's not an error)
2107  */
2108 static void *
2109 new_tunnel_notify (void *cls,
2110                    struct GNUNET_MESH_Tunnel *tunnel,
2111                    const struct GNUNET_PeerIdentity *initiator,
2112                    const struct GNUNET_ATS_Information *atsi)
2113 {
2114   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2115   struct GNUNET_STREAM_Socket *socket;
2116
2117   /* FIXME: If a tunnel is already created, we should not accept new tunnels
2118      from the same peer again until the socket is closed */
2119
2120   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2121   socket->other_peer = GNUNET_PEER_intern (initiator);
2122   socket->tunnel = tunnel;
2123   socket->session_id = 0;       /* FIXME */
2124   socket->state = STATE_INIT;
2125   socket->derived = GNUNET_YES;
2126   socket->our_id = lsocket->our_id;
2127   
2128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2129               "%x: Peer %x initiated tunnel to us\n", 
2130               socket->our_id,
2131               socket->other_peer);
2132   
2133   /* FIXME: Copy MESH handle from lsocket to socket */
2134   /* FIXME: What if listen_cb is NULL */
2135   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
2136                                            socket,
2137                                            initiator))
2138     {
2139       socket->state = STATE_CLOSED;
2140       /* FIXME: Send CLOSE message and then free */
2141       GNUNET_free (socket);
2142       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
2143     }
2144   return socket;
2145 }
2146
2147
2148 /**
2149  * Function called whenever an inbound tunnel is destroyed.  Should clean up
2150  * any associated state.  This function is NOT called if the client has
2151  * explicitly asked for the tunnel to be destroyed using
2152  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2153  * the tunnel.
2154  *
2155  * @param cls closure (set from GNUNET_MESH_connect)
2156  * @param tunnel connection to the other end (henceforth invalid)
2157  * @param tunnel_ctx place where local state associated
2158  *                   with the tunnel is stored
2159  */
2160 static void 
2161 tunnel_cleaner (void *cls,
2162                 const struct GNUNET_MESH_Tunnel *tunnel,
2163                 void *tunnel_ctx)
2164 {
2165   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2166   
2167   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2168               "%x: Peer %x has terminated connection abruptly\n",
2169               socket->our_id,
2170               socket->other_peer);
2171
2172   socket->status = GNUNET_STREAM_SHUTDOWN;
2173
2174   /* Clear Transmit handles */
2175   if (NULL != socket->transmit_handle)
2176     {
2177       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2178       socket->transmit_handle = NULL;
2179     }
2180   socket->tunnel = NULL;
2181 }
2182
2183
2184 /*****************/
2185 /* API functions */
2186 /*****************/
2187
2188
2189 /**
2190  * Tries to open a stream to the target peer
2191  *
2192  * @param cfg configuration to use
2193  * @param target the target peer to which the stream has to be opened
2194  * @param app_port the application port number which uniquely identifies this
2195  *            stream
2196  * @param open_cb this function will be called after stream has be established 
2197  * @param open_cb_cls the closure for open_cb
2198  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2199  * @return if successful it returns the stream socket; NULL if stream cannot be
2200  *         opened 
2201  */
2202 struct GNUNET_STREAM_Socket *
2203 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2204                     const struct GNUNET_PeerIdentity *target,
2205                     GNUNET_MESH_ApplicationType app_port,
2206                     GNUNET_STREAM_OpenCallback open_cb,
2207                     void *open_cb_cls,
2208                     ...)
2209 {
2210   struct GNUNET_STREAM_Socket *socket;
2211   struct GNUNET_PeerIdentity own_peer_id;
2212   enum GNUNET_STREAM_Option option;
2213   va_list vargs;                /* Variable arguments */
2214
2215   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2216               "%s\n", __func__);
2217
2218   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2219   socket->other_peer = GNUNET_PEER_intern (target);
2220   socket->open_cb = open_cb;
2221   socket->open_cls = open_cb_cls;
2222   GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
2223   socket->our_id = GNUNET_PEER_intern (&own_peer_id);
2224   
2225   /* Set defaults */
2226   socket->retransmit_timeout = 
2227     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2228
2229   va_start (vargs, open_cb_cls); /* Parse variable args */
2230   do {
2231     option = va_arg (vargs, enum GNUNET_STREAM_Option);
2232     switch (option)
2233       {
2234       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2235         /* Expect struct GNUNET_TIME_Relative */
2236         socket->retransmit_timeout = va_arg (vargs,
2237                                              struct GNUNET_TIME_Relative);
2238         break;
2239       case GNUNET_STREAM_OPTION_END:
2240         break;
2241       }
2242   } while (GNUNET_STREAM_OPTION_END != option);
2243   va_end (vargs);               /* End of variable args parsing */
2244   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
2245                                       10,  /* QUEUE size as parameter? */
2246                                       socket, /* cls */
2247                                       NULL, /* No inbound tunnel handler */
2248                                       &tunnel_cleaner, /* FIXME: not required? */
2249                                       client_message_handlers,
2250                                       &app_port); /* We don't get inbound tunnels */
2251   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
2252     {
2253       GNUNET_free (socket);
2254       return NULL;
2255     }
2256
2257   /* Now create the mesh tunnel to target */
2258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2259               "Creating MESH Tunnel\n");
2260   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2261                                               NULL, /* Tunnel context */
2262                                               &mesh_peer_connect_callback,
2263                                               &mesh_peer_disconnect_callback,
2264                                               socket);
2265   GNUNET_assert (NULL != socket->tunnel);
2266   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2267                                         target);
2268   
2269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2270               "%s() END\n", __func__);
2271   return socket;
2272 }
2273
2274
2275 /**
2276  * Shutdown the stream for reading or writing (man 2 shutdown).
2277  *
2278  * @param socket the stream socket
2279  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
2280  */
2281 void
2282 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2283                         int how)
2284 {
2285   return;
2286 }
2287
2288
2289 /**
2290  * Closes the stream
2291  *
2292  * @param socket the stream socket
2293  */
2294 void
2295 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2296 {
2297   struct MessageQueue *head;
2298
2299   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2300   {
2301     /* socket closed with read task pending!? */
2302     GNUNET_break (0);
2303     GNUNET_SCHEDULER_cancel (socket->read_task_id);
2304     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2305   }
2306
2307   /* Clear Transmit handles */
2308   if (NULL != socket->transmit_handle)
2309     {
2310       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2311       socket->transmit_handle = NULL;
2312     }
2313
2314   /* Clear existing message queue */
2315   while (NULL != (head = socket->queue_head)) {
2316     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2317                                  socket->queue_tail,
2318                                  head);
2319     GNUNET_free (head->message);
2320     GNUNET_free (head);
2321   }
2322
2323   /* Close associated tunnel */
2324   if (NULL != socket->tunnel)
2325     {
2326       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2327       socket->tunnel = NULL;
2328     }
2329
2330   /* Close mesh connection */
2331   if (NULL != socket->mesh && GNUNET_YES != socket->derived)
2332     {
2333       GNUNET_MESH_disconnect (socket->mesh);
2334       socket->mesh = NULL;
2335     }
2336   
2337   /* Release receive buffer */
2338   if (NULL != socket->receive_buffer)
2339     {
2340       GNUNET_free (socket->receive_buffer);
2341     }
2342
2343   GNUNET_free (socket);
2344 }
2345
2346
2347 /**
2348  * Listens for stream connections for a specific application ports
2349  *
2350  * @param cfg the configuration to use
2351  * @param app_port the application port for which new streams will be accepted
2352  * @param listen_cb this function will be called when a peer tries to establish
2353  *            a stream with us
2354  * @param listen_cb_cls closure for listen_cb
2355  * @return listen socket, NULL for any error
2356  */
2357 struct GNUNET_STREAM_ListenSocket *
2358 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2359                       GNUNET_MESH_ApplicationType app_port,
2360                       GNUNET_STREAM_ListenCallback listen_cb,
2361                       void *listen_cb_cls)
2362 {
2363   /* FIXME: Add variable args for passing configration options? */
2364   struct GNUNET_STREAM_ListenSocket *lsocket;
2365   struct GNUNET_PeerIdentity our_peer_id;
2366
2367   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2368   lsocket->port = app_port;
2369   lsocket->listen_cb = listen_cb;
2370   lsocket->listen_cb_cls = listen_cb_cls;
2371   GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
2372   lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
2373   lsocket->mesh = GNUNET_MESH_connect (cfg,
2374                                        10, /* FIXME: QUEUE size as parameter? */
2375                                        lsocket, /* Closure */
2376                                        &new_tunnel_notify,
2377                                        &tunnel_cleaner,
2378                                        server_message_handlers,
2379                                        &app_port);
2380   GNUNET_assert (NULL != lsocket->mesh);
2381   return lsocket;
2382 }
2383
2384
2385 /**
2386  * Closes the listen socket
2387  *
2388  * @param lsocket the listen socket
2389  */
2390 void
2391 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2392 {
2393   /* Close MESH connection */
2394   GNUNET_assert (NULL != lsocket->mesh);
2395   GNUNET_MESH_disconnect (lsocket->mesh);
2396   
2397   GNUNET_free (lsocket);
2398 }
2399
2400
2401 /**
2402  * Tries to write the given data to the stream
2403  *
2404  * @param socket the socket representing a stream
2405  * @param data the data buffer from where the data is written into the stream
2406  * @param size the number of bytes to be written from the data buffer
2407  * @param timeout the timeout period
2408  * @param write_cont the function to call upon writing some bytes into the stream
2409  * @param write_cont_cls the closure
2410  * @return handle to cancel the operation
2411  */
2412 struct GNUNET_STREAM_IOWriteHandle *
2413 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2414                      const void *data,
2415                      size_t size,
2416                      struct GNUNET_TIME_Relative timeout,
2417                      GNUNET_STREAM_CompletionContinuation write_cont,
2418                      void *write_cont_cls)
2419 {
2420   unsigned int num_needed_packets;
2421   unsigned int packet;
2422   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2423   uint32_t packet_size;
2424   uint32_t payload_size;
2425   struct GNUNET_STREAM_DataMessage *data_msg;
2426   const void *sweep;
2427   struct GNUNET_TIME_Relative ack_deadline;
2428
2429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2430               "%s\n", __func__);
2431
2432   /* Return NULL if there is already a write request pending */
2433   if (NULL != socket->write_handle)
2434   {
2435     GNUNET_break (0);
2436     return NULL;
2437   }
2438   if (!((STATE_ESTABLISHED == socket->state)
2439         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2440         || (STATE_RECEIVE_CLOSED == socket->state)))
2441     {
2442       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2443                   "%x: Attempting to write on a closed (OR) not-yet-established"
2444                   "stream\n",
2445                   socket->our_id);
2446       return NULL;
2447     } 
2448   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2449     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2450   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2451   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2452   io_handle->write_cont = write_cont;
2453   io_handle->write_cont_cls = write_cont_cls;
2454   io_handle->size = size;
2455   sweep = data;
2456   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2457      determined from RTT */
2458   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
2459   /* Divide the given buffer into packets for sending */
2460   for (packet=0; packet < num_needed_packets; packet++)
2461     {
2462       if ((packet + 1) * max_payload_size < size) 
2463         {
2464           payload_size = max_payload_size;
2465           packet_size = MAX_PACKET_SIZE;
2466         }
2467       else 
2468         {
2469           payload_size = size - packet * max_payload_size;
2470           packet_size =  payload_size + sizeof (struct
2471                                                 GNUNET_STREAM_DataMessage); 
2472         }
2473       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2474       io_handle->messages[packet]->header.header.size = htons (packet_size);
2475       io_handle->messages[packet]->header.header.type =
2476         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2477       io_handle->messages[packet]->sequence_number =
2478         htonl (socket->write_sequence_number++);
2479       io_handle->messages[packet]->offset = htonl (socket->write_offset);
2480
2481       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2482          determined from RTT */
2483       io_handle->messages[packet]->ack_deadline =
2484         GNUNET_TIME_relative_hton (ack_deadline);
2485       data_msg = io_handle->messages[packet];
2486       /* Copy data from given buffer to the packet */
2487       memcpy (&data_msg[1],
2488               sweep,
2489               payload_size);
2490       sweep += payload_size;
2491       socket->write_offset += payload_size;
2492     }
2493   socket->write_handle = io_handle;
2494   write_data (socket);
2495
2496   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2497               "%s() END\n", __func__);
2498
2499   return io_handle;
2500 }
2501
2502
2503 /**
2504  * Tries to read data from the stream
2505  *
2506  * @param socket the socket representing a stream
2507  * @param timeout the timeout period
2508  * @param proc function to call with data (once only)
2509  * @param proc_cls the closure for proc
2510  * @return handle to cancel the operation
2511  */
2512 struct GNUNET_STREAM_IOReadHandle *
2513 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2514                     struct GNUNET_TIME_Relative timeout,
2515                     GNUNET_STREAM_DataProcessor proc,
2516                     void *proc_cls)
2517 {
2518   struct GNUNET_STREAM_IOReadHandle *read_handle;
2519   
2520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2521               "%s()\n", __func__);
2522
2523   /* Return NULL if there is already a read handle; the user has to cancel that
2524   first before continuing or has to wait until it is completed */
2525   if (NULL != socket->read_handle) return NULL;
2526
2527   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2528   read_handle->proc = proc;
2529   socket->read_handle = read_handle;
2530
2531   /* Check if we have a packet at bitmap 0 */
2532   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2533                                           0))
2534     {
2535       socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
2536                                                        socket);
2537    
2538     }
2539   
2540   /* Setup the read timeout task */
2541   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2542                                                                &read_io_timeout,
2543                                                                socket);
2544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2545               "%s() END\n", __func__);
2546   return read_handle;
2547 }
2548
2549
2550 /**
2551  * Cancel pending write operation.
2552  *
2553  * @param ioh handle to operation to cancel
2554  */
2555 void
2556 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2557 {
2558   return;
2559 }
2560
2561
2562 /**
2563  * Cancel pending read operation.
2564  *
2565  * @param ioh handle to operation to cancel
2566  */
2567 void
2568 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2569 {
2570   return;
2571 }