6f7a7f2ce94e6e6e650c5cc86634704b3664bea9
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param socket the socket the written message was bound to
110  */
111 typedef void (*SendFinishCallback) (void *cls,
112                                     struct GNUNET_STREAM_Socket *socket);
113
114
115 /**
116  * The send message queue
117  */
118 struct MessageQueue
119 {
120   /**
121    * The message
122    */
123   struct GNUNET_STREAM_MessageHeader *message;
124
125   /**
126    * Callback to be called when the message is sent
127    */
128   SendFinishCallback finish_cb;
129
130   /**
131    * The closure for finish_cb
132    */
133   void *finish_cb_cls;
134
135   /**
136    * The next message in queue. Should be NULL in the last message
137    */
138   struct MessageQueue *next;
139
140   /**
141    * The next message in queue. Should be NULL in the last message
142    */
143   struct MessageQueue *prev;
144 };
145
146
147 /**
148  * The STREAM Socket Handler
149  */
150 struct GNUNET_STREAM_Socket
151 {
152
153   /**
154    * The peer identity of the peer at the other end of the stream
155    */
156   struct GNUNET_PeerIdentity other_peer;
157
158   /**
159    * Retransmission timeout
160    */
161   struct GNUNET_TIME_Relative retransmit_timeout;
162
163   /**
164    * The Acknowledgement Bitmap
165    */
166   GNUNET_STREAM_AckBitmap ack_bitmap;
167
168   /**
169    * Time when the Acknowledgement was queued
170    */
171   struct GNUNET_TIME_Absolute ack_time_registered;
172
173   /**
174    * Queued Acknowledgement deadline
175    */
176   struct GNUNET_TIME_Relative ack_time_deadline;
177
178   /**
179    * The task for sending timely Acks
180    */
181   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
182
183   /**
184    * The mesh handle
185    */
186   struct GNUNET_MESH_Handle *mesh;
187
188   /**
189    * The mesh tunnel handle
190    */
191   struct GNUNET_MESH_Tunnel *tunnel;
192
193   /**
194    * Stream open closure
195    */
196   void *open_cls;
197
198   /**
199    * Stream open callback
200    */
201   GNUNET_STREAM_OpenCallback open_cb;
202
203   /**
204    * The current transmit handle (if a pending transmit request exists)
205    */
206   struct GNUNET_MESH_TransmitHandle *transmit_handle;
207
208   /**
209    * The current message associated with the transmit handle
210    */
211   struct MessageQueue *queue_head;
212
213   /**
214    * The queue tail, should always point to the last message in queue
215    */
216   struct MessageQueue *queue_tail;
217
218   /**
219    * The write IO_handle associated with this socket
220    */
221   struct GNUNET_STREAM_IOHandle *write_handle;
222
223   /**
224    * The read IO_handle associated with this socket
225    */
226   struct GNUNET_STREAM_IOHandle *read_handle;
227
228   /**
229    * Buffer for storing received messages
230    */
231   void *receive_buffer;
232
233   /**
234    * The state of the protocol associated with this socket
235    */
236   enum State state;
237
238   /**
239    * The status of the socket
240    */
241   enum GNUNET_STREAM_Status status;
242
243   /**
244    * The number of previous timeouts; FIXME: currently not used
245    */
246   unsigned int retries;
247
248   /**
249    * The session id associated with this stream connection
250    * FIXME: Not used currently, may be removed
251    */
252   uint32_t session_id;
253
254   /**
255    * Write sequence number. Set to random when sending HELLO(client) and
256    * HELLO_ACK(server) 
257    */
258   uint32_t write_sequence_number;
259
260   /**
261    * Read sequence number. This number's value is determined during handshake
262    */
263   uint32_t read_sequence_number;
264
265   /**
266    * receiver's available buffer after the last acknowledged packet
267    */
268   uint32_t receive_window_available;
269 };
270
271
272 /**
273  * A socket for listening
274  */
275 struct GNUNET_STREAM_ListenSocket
276 {
277
278   /**
279    * The mesh handle
280    */
281   struct GNUNET_MESH_Handle *mesh;
282
283   /**
284    * The callback function which is called after successful opening socket
285    */
286   GNUNET_STREAM_ListenCallback listen_cb;
287
288   /**
289    * The call back closure
290    */
291   void *listen_cb_cls;
292
293   /**
294    * The service port
295    */
296   GNUNET_MESH_ApplicationType port;
297 };
298
299
300 /**
301  * The IO Handle
302  */
303 struct GNUNET_STREAM_IOHandle
304 {
305   /**
306    * The packet_buffers associated with this Handle
307    */
308   struct GNUNET_STREAM_DataMessage *messages[64];
309
310   /**
311    * The bitmap of this IOHandle; Corresponding bit for a message is set when
312    * it has been acknowledged by the receiver
313    */
314   GNUNET_STREAM_AckBitmap ack_bitmap;
315
316   /**
317    * receiver's available buffer
318    */
319   uint32_t receive_window_available;
320
321   /**
322    * Number of packets sent before waiting for an ack
323    *
324    * FIXME: Do we need this?
325    */
326   unsigned int sent_packets;
327 };
328
329
330 /**
331  * Default value in seconds for various timeouts
332  */
333 static unsigned int default_timeout = 300;
334
335
336 /**
337  * Callback function for sending hello message
338  *
339  * @param cls closure the socket
340  * @param size number of bytes available in buf
341  * @param buf where the callee should write the message
342  * @return number of bytes written to buf
343  */
344 static size_t
345 send_message_notify (void *cls, size_t size, void *buf)
346 {
347   struct GNUNET_STREAM_Socket *socket = cls;
348   struct MessageQueue *head;
349   size_t ret;
350
351   socket->transmit_handle = NULL; /* Remove the transmit handle */
352   head = socket->queue_head;
353   if (NULL == head)
354     return 0; /* just to be safe */
355   if (0 == size)                /* request timed out */
356     {
357       socket->retries++;
358       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359                   "Message sending timed out. Retry %d \n",
360                   socket->retries);
361       socket->transmit_handle = 
362         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
363                                            0, /* Corking */
364                                            1, /* Priority */
365                                            /* FIXME: exponential backoff */
366                                            socket->retransmit_timeout,
367                                            &socket->other_peer,
368                                            ntohs (head->message->header.size),
369                                            &send_message_notify,
370                                            socket);
371       return 0;
372     }
373
374   ret = ntohs (head->message->header.size);
375   GNUNET_assert (size >= ret);
376   memcpy (buf, head->message, ret);
377   if (NULL != head->finish_cb)
378     {
379       head->finish_cb (socket, head->finish_cb_cls);
380     }
381   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
382                                socket->queue_tail,
383                                head);
384   GNUNET_free (head->message);
385   GNUNET_free (head);
386   head = socket->queue_head;
387   if (NULL != head)    /* more pending messages to send */
388     {
389       socket->retries = 0;
390       socket->transmit_handle = 
391         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
392                                            0, /* Corking */
393                                            1, /* Priority */
394                                            /* FIXME: exponential backoff */
395                                            socket->retransmit_timeout,
396                                            &socket->other_peer,
397                                            ntohs (head->message->header.size),
398                                            &send_message_notify,
399                                            socket);
400     }
401   return ret;
402 }
403
404
405 /**
406  * Queues a message for sending using the mesh connection of a socket
407  *
408  * @param socket the socket whose mesh connection is used
409  * @param message the message to be sent
410  * @param finish_cb the callback to be called when the message is sent
411  * @param finish_cb_cls the closure for the callback
412  */
413 static void
414 queue_message (struct GNUNET_STREAM_Socket *socket,
415                struct GNUNET_STREAM_MessageHeader *message,
416                SendFinishCallback finish_cb,
417                void *finish_cb_cls)
418 {
419   struct MessageQueue *queue_entity;
420
421   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
422   queue_entity->message = message;
423   queue_entity->finish_cb = finish_cb;
424   queue_entity->finish_cb_cls = finish_cb_cls;
425   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
426                                     socket->queue_tail,
427                                     queue_entity);
428   if (NULL == socket->transmit_handle)
429   {
430     socket->retries = 0;
431     socket->transmit_handle = 
432       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
433                                          0, /* Corking */
434                                          1, /* Priority */
435                                          socket->retransmit_timeout,
436                                          &socket->other_peer,
437                                          ntohs (message->header.size),
438                                          &send_message_notify,
439                                          socket);
440   }
441 }
442
443
444 /**
445  * Callback function for sending ack message
446  *
447  * @param cls closure the ACK message created in ack_task
448  * @param size number of bytes available in buffer
449  * @param buf where the callee should write the message
450  * @return number of bytes written to buf
451  */
452 static size_t
453 send_ack_notify (void *cls, size_t size, void *buf)
454 {
455   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
456
457   if (0 == size)
458     {
459       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
460                   "%s called with size 0\n", __func__);
461       return 0;
462     }
463   GNUNET_assert (ack_msg->header.header.size <= size);
464   
465   size = ack_msg->header.header.size;
466   memcpy (buf, ack_msg, size);
467   return size;
468 }
469
470
471 /**
472  * Task for sending ACK message
473  *
474  * @param cls the socket
475  * @param tc the Task context
476  */
477 static void
478 ack_task (void *cls,
479           const struct GNUNET_SCHEDULER_TaskContext *tc)
480 {
481   struct GNUNET_STREAM_Socket *socket = cls;
482   struct GNUNET_STREAM_AckMessage *ack_msg;
483
484   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
485     {
486       return;
487     }
488
489   socket->ack_task_id = 0;
490
491   /* Create the ACK Message */
492   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
493   ack_msg->header.header.size = htons (sizeof (struct 
494                                                GNUNET_STREAM_AckMessage));
495   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
496   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
497   ack_msg->base_sequence_number = htonl (socket->write_sequence_number);
498   ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
499
500   /* Request MESH for sending ACK */
501   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
502                                      0, /* Corking */
503                                      1, /* Priority */
504                                      socket->retransmit_timeout,
505                                      &socket->other_peer,
506                                      ntohs (ack_msg->header.header.size),
507                                      &send_ack_notify,
508                                      ack_msg);
509
510   
511 }
512
513
514 /**
515  * Function to modify a bit in GNUNET_STREAM_AckBitmap
516  *
517  * @param bitmap the bitmap to modify
518  * @param bit the bit number to modify
519  * @param value GNUNET_YES to on, GNUNET_NO to off
520  */
521 static void
522 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
523                       unsigned int bit, 
524                       int value)
525 {
526   GNUNET_assert (bit < 64);
527   if (GNUNET_YES == value)
528     *bitmap |= (1LL << bit);
529   else
530     *bitmap &= ~(1LL << bit);
531 }
532
533
534 /**
535  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
536  *
537  * @param bitmap address of the bitmap that has to be checked
538  * @param bit the bit number to check
539  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
540  */
541 static uint8_t
542 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
543                       unsigned int bit)
544 {
545   GNUNET_assert (bit < 64);
546   return 0 != (*bitmap & (1LL << bit));
547 }
548
549
550
551 /**
552  * Function called when Data Message is sent
553  *
554  * @param cls the io_handle corresponding to the Data Message
555  * @param socket the socket which was used
556  */
557 static void
558 write_data_finish_cb (void *cls,
559                       struct GNUNET_STREAM_Socket *socket)
560 {
561   struct GNUNET_STREAM_IOHandle *io_handle = cls;
562
563   io_handle->sent_packets++;
564 }
565
566
567 /**
568  * Writes data using the given socket. The amount of data written is limited by
569  * the receive_window_size
570  *
571  * @param socket the socket to use
572  */
573 static void 
574 write_data (struct GNUNET_STREAM_Socket *socket)
575 {
576   struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
577   unsigned int packet;
578   int ack_packet;
579
580   ack_packet = -1;
581   /* Find the last acknowledged packet */
582   for (packet=0; packet < 64; packet++)
583     {
584       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
585                                               packet))
586         {
587           ack_packet = packet;
588         }
589     }
590   /* Resend packets which weren't ack'ed */
591   for (packet=0; packet < ack_packet; packet++)
592     {
593       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
594                                              packet))
595         {
596           queue_message (socket,
597                          &io_handle->messages[packet]->header,
598                          NULL,
599                          NULL);
600         }
601     }
602   packet = ack_packet + 1;
603   /* Now send new packets if there is enough buffer space */
604   while (io_handle->receive_window_available -=
605          io_handle->messages[packet]->header.header.size > 0)
606     {
607       queue_message (socket,
608                      &io_handle->messages[packet]->header,
609                      &write_data_finish_cb,
610                      io_handle);
611     }
612 }
613
614
615 /**
616  * Handler for DATA messages; Same for both client and server
617  *
618  * @param socket the socket through which the ack was received
619  * @param tunnel connection to the other end
620  * @param sender who sent the message
621  * @param ack the acknowledgment message
622  * @param atsi performance data for the connection
623  * @return GNUNET_OK to keep the connection open,
624  *         GNUNET_SYSERR to close it (signal serious error)
625  */
626 static int
627 handle_data (struct GNUNET_STREAM_Socket *socket,
628              struct GNUNET_MESH_Tunnel *tunnel,
629              const struct GNUNET_PeerIdentity *sender,
630              const struct GNUNET_STREAM_DataMessage *msg,
631              const struct GNUNET_ATS_Information*atsi)
632 {
633   uint16_t size;
634   const void *payload;
635
636   size = msg->header.header.size;
637   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
638     {
639       GNUNET_break_op (0);
640       return GNUNET_SYSERR;
641     }
642
643   switch (socket->state)
644     {
645     case STATE_ESTABLISHED:
646     case STATE_TRANSMIT_CLOSED:
647     case STATE_TRANSMIT_CLOSE_WAIT:
648       GNUNET_assert (NULL != socket->receive_buffer);
649       /* check if the message's sequence number is greater than the one we are
650          expecting */
651       if (ntohl (msg->sequence_number) - socket->read_sequence_number < 64)
652         {
653
654         }
655       else                      /* We are receiving a retransmitted message */
656         {
657           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658                       "Message with sequence number %d retransmitted\n",
659                       ntohl (socket->read_sequence_number));
660           return GNUNET_YES;
661         }
662       /* Copy Data to buffer and send acknowledgements */
663       size -= sizeof (struct GNUNET_STREAM_DataMessage);
664       payload = &msg[1];
665       memcpy (socket->receive_buffer 
666               + (ntohl (msg->sequence_number) - socket->read_sequence_number)
667               * MAX_PACKET_SIZE,
668               payload,
669               size);
670       
671       /* Modify the ACK bitmap */
672       ackbitmap_modify_bit (&socket->ack_bitmap,
673                             ntohl (msg->sequence_number) -
674                             socket->read_sequence_number,
675                             GNUNET_YES);
676
677       /* Start ACK sending task if one is not already present */
678       if (0 == socket->ack_task_id)
679        {
680          socket->ack_task_id = 
681            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
682                                          (msg->ack_deadline),
683                                          &ack_task,
684                                          socket);
685        }
686       
687       break;
688
689     default:
690       /* FIXME: call statistics */
691       break;
692     }
693   return GNUNET_YES;
694 }
695
696 /**
697  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
698  *
699  * @param cls the socket (set from GNUNET_MESH_connect)
700  * @param tunnel connection to the other end
701  * @param tunnel_ctx place to store local state associated with the tunnel
702  * @param sender who sent the message
703  * @param message the actual message
704  * @param atsi performance data for the connection
705  * @return GNUNET_OK to keep the connection open,
706  *         GNUNET_SYSERR to close it (signal serious error)
707  */
708 static int
709 client_handle_data (void *cls,
710              struct GNUNET_MESH_Tunnel *tunnel,
711              void **tunnel_ctx,
712              const struct GNUNET_PeerIdentity *sender,
713              const struct GNUNET_MessageHeader *message,
714              const struct GNUNET_ATS_Information*atsi)
715 {
716   struct GNUNET_STREAM_Socket *socket = cls;
717
718   return handle_data (socket, 
719                       tunnel, 
720                       sender, 
721                       (const struct GNUNET_STREAM_DataMessage *) message, 
722                       atsi);
723 }
724
725
726 /**
727  * Callback to set state to ESTABLISHED
728  *
729  * @param cls the closure from queue_message FIXME: document
730  * @param socket the socket to requiring state change
731  */
732 static void
733 set_state_established (void *cls,
734                        struct GNUNET_STREAM_Socket *socket)
735 {
736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
737   /* Initialize the receive buffer */
738   socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE);
739   socket->state = STATE_ESTABLISHED;
740 }
741
742
743 /**
744  * Callback to set state to HELLO_WAIT
745  *
746  * @param cls the closure from queue_message
747  * @param socket the socket to requiring state change
748  */
749 static void
750 set_state_hello_wait (void *cls,
751                       struct GNUNET_STREAM_Socket *socket)
752 {
753   GNUNET_assert (STATE_INIT == socket->state);
754   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
755   socket->state = STATE_HELLO_WAIT;
756 }
757
758
759 /**
760  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
761  *
762  * @param cls the socket (set from GNUNET_MESH_connect)
763  * @param tunnel connection to the other end
764  * @param tunnel_ctx this is NULL
765  * @param sender who sent the message
766  * @param message the actual message
767  * @param atsi performance data for the connection
768  * @return GNUNET_OK to keep the connection open,
769  *         GNUNET_SYSERR to close it (signal serious error)
770  */
771 static int
772 client_handle_hello_ack (void *cls,
773                          struct GNUNET_MESH_Tunnel *tunnel,
774                          void **tunnel_ctx,
775                          const struct GNUNET_PeerIdentity *sender,
776                          const struct GNUNET_MessageHeader *message,
777                          const struct GNUNET_ATS_Information*atsi)
778 {
779   struct GNUNET_STREAM_Socket *socket = cls;
780   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
781   struct GNUNET_STREAM_HelloAckMessage *reply;
782
783   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
784   GNUNET_assert (socket->tunnel == tunnel);
785   switch (socket->state)
786   {
787   case STATE_HELLO_WAIT:
788       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
789       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
790       /* Get the random sequence number */
791       socket->write_sequence_number = 
792         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
793       reply = 
794         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
795       reply->header.header.size = 
796         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
797       reply->header.header.type = 
798         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
799       reply->sequence_number = htonl (socket->write_sequence_number);
800       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
801       queue_message (socket, 
802                      &reply->header, 
803                      &set_state_established, 
804                      NULL);      
805       return GNUNET_OK;
806   case STATE_ESTABLISHED:
807   case STATE_RECEIVE_CLOSE_WAIT:
808     // call statistics (# ACKs ignored++)
809     return GNUNET_OK;
810   case STATE_INIT:
811   default:
812     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813                 "Server sent HELLO_ACK when in state %d\n", socket->state);
814     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
815     return GNUNET_SYSERR;
816   }
817
818 }
819
820
821 /**
822  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
823  *
824  * @param cls the socket (set from GNUNET_MESH_connect)
825  * @param tunnel connection to the other end
826  * @param tunnel_ctx this is NULL
827  * @param sender who sent the message
828  * @param message the actual message
829  * @param atsi performance data for the connection
830  * @return GNUNET_OK to keep the connection open,
831  *         GNUNET_SYSERR to close it (signal serious error)
832  */
833 static int
834 client_handle_reset (void *cls,
835                      struct GNUNET_MESH_Tunnel *tunnel,
836                      void **tunnel_ctx,
837                      const struct GNUNET_PeerIdentity *sender,
838                      const struct GNUNET_MessageHeader *message,
839                      const struct GNUNET_ATS_Information*atsi)
840 {
841   struct GNUNET_STREAM_Socket *socket = cls;
842
843   return GNUNET_OK;
844 }
845
846
847 /**
848  * Common message handler for handling TRANSMIT_CLOSE messages
849  *
850  * @param socket the socket through which the ack was received
851  * @param tunnel connection to the other end
852  * @param sender who sent the message
853  * @param ack the acknowledgment message
854  * @param atsi performance data for the connection
855  * @return GNUNET_OK to keep the connection open,
856  *         GNUNET_SYSERR to close it (signal serious error)
857  */
858 static int
859 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
860                        struct GNUNET_MESH_Tunnel *tunnel,
861                        const struct GNUNET_PeerIdentity *sender,
862                        const struct GNUNET_STREAM_MessageHeader *msg,
863                        const struct GNUNET_ATS_Information*atsi)
864 {
865   struct GNUNET_STREAM_MessageHeader *reply;
866
867   switch (socket->state)
868     {
869     case STATE_ESTABLISHED:
870       socket->state = STATE_RECEIVE_CLOSED;
871
872       /* Send TRANSMIT_CLOSE_ACK */
873       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
874       reply->header.type = 
875         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
876       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
877       queue_message (socket, reply, NULL, NULL);
878       break;
879
880     default:
881       /* FIXME: Call statistics? */
882       break;
883     }
884   return GNUNET_YES;
885 }
886
887
888 /**
889  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
890  *
891  * @param cls the socket (set from GNUNET_MESH_connect)
892  * @param tunnel connection to the other end
893  * @param tunnel_ctx this is NULL
894  * @param sender who sent the message
895  * @param message the actual message
896  * @param atsi performance data for the connection
897  * @return GNUNET_OK to keep the connection open,
898  *         GNUNET_SYSERR to close it (signal serious error)
899  */
900 static int
901 client_handle_transmit_close (void *cls,
902                               struct GNUNET_MESH_Tunnel *tunnel,
903                               void **tunnel_ctx,
904                               const struct GNUNET_PeerIdentity *sender,
905                               const struct GNUNET_MessageHeader *message,
906                               const struct GNUNET_ATS_Information*atsi)
907 {
908   struct GNUNET_STREAM_Socket *socket = cls;
909   
910   return handle_transmit_close (socket,
911                                 tunnel,
912                                 sender,
913                                 (struct GNUNET_STREAM_MessageHeader *)message,
914                                 atsi);
915 }
916
917
918 /**
919  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
920  *
921  * @param cls the socket (set from GNUNET_MESH_connect)
922  * @param tunnel connection to the other end
923  * @param tunnel_ctx this is NULL
924  * @param sender who sent the message
925  * @param message the actual message
926  * @param atsi performance data for the connection
927  * @return GNUNET_OK to keep the connection open,
928  *         GNUNET_SYSERR to close it (signal serious error)
929  */
930 static int
931 client_handle_transmit_close_ack (void *cls,
932                                   struct GNUNET_MESH_Tunnel *tunnel,
933                                   void **tunnel_ctx,
934                                   const struct GNUNET_PeerIdentity *sender,
935                                   const struct GNUNET_MessageHeader *message,
936                                   const struct GNUNET_ATS_Information*atsi)
937 {
938   struct GNUNET_STREAM_Socket *socket = cls;
939
940   return GNUNET_OK;
941 }
942
943
944 /**
945  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
946  *
947  * @param cls the socket (set from GNUNET_MESH_connect)
948  * @param tunnel connection to the other end
949  * @param tunnel_ctx this is NULL
950  * @param sender who sent the message
951  * @param message the actual message
952  * @param atsi performance data for the connection
953  * @return GNUNET_OK to keep the connection open,
954  *         GNUNET_SYSERR to close it (signal serious error)
955  */
956 static int
957 client_handle_receive_close (void *cls,
958                              struct GNUNET_MESH_Tunnel *tunnel,
959                              void **tunnel_ctx,
960                              const struct GNUNET_PeerIdentity *sender,
961                              const struct GNUNET_MessageHeader *message,
962                              const struct GNUNET_ATS_Information*atsi)
963 {
964   struct GNUNET_STREAM_Socket *socket = cls;
965
966   return GNUNET_OK;
967 }
968
969
970 /**
971  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
972  *
973  * @param cls the socket (set from GNUNET_MESH_connect)
974  * @param tunnel connection to the other end
975  * @param tunnel_ctx this is NULL
976  * @param sender who sent the message
977  * @param message the actual message
978  * @param atsi performance data for the connection
979  * @return GNUNET_OK to keep the connection open,
980  *         GNUNET_SYSERR to close it (signal serious error)
981  */
982 static int
983 client_handle_receive_close_ack (void *cls,
984                                  struct GNUNET_MESH_Tunnel *tunnel,
985                                  void **tunnel_ctx,
986                                  const struct GNUNET_PeerIdentity *sender,
987                                  const struct GNUNET_MessageHeader *message,
988                                  const struct GNUNET_ATS_Information*atsi)
989 {
990   struct GNUNET_STREAM_Socket *socket = cls;
991
992   return GNUNET_OK;
993 }
994
995
996 /**
997  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
998  *
999  * @param cls the socket (set from GNUNET_MESH_connect)
1000  * @param tunnel connection to the other end
1001  * @param tunnel_ctx this is NULL
1002  * @param sender who sent the message
1003  * @param message the actual message
1004  * @param atsi performance data for the connection
1005  * @return GNUNET_OK to keep the connection open,
1006  *         GNUNET_SYSERR to close it (signal serious error)
1007  */
1008 static int
1009 client_handle_close (void *cls,
1010                      struct GNUNET_MESH_Tunnel *tunnel,
1011                      void **tunnel_ctx,
1012                      const struct GNUNET_PeerIdentity *sender,
1013                      const struct GNUNET_MessageHeader *message,
1014                      const struct GNUNET_ATS_Information*atsi)
1015 {
1016   struct GNUNET_STREAM_Socket *socket = cls;
1017
1018   return GNUNET_OK;
1019 }
1020
1021
1022 /**
1023  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1024  *
1025  * @param cls the socket (set from GNUNET_MESH_connect)
1026  * @param tunnel connection to the other end
1027  * @param tunnel_ctx this is NULL
1028  * @param sender who sent the message
1029  * @param message the actual message
1030  * @param atsi performance data for the connection
1031  * @return GNUNET_OK to keep the connection open,
1032  *         GNUNET_SYSERR to close it (signal serious error)
1033  */
1034 static int
1035 client_handle_close_ack (void *cls,
1036                          struct GNUNET_MESH_Tunnel *tunnel,
1037                          void **tunnel_ctx,
1038                          const struct GNUNET_PeerIdentity *sender,
1039                          const struct GNUNET_MessageHeader *message,
1040                          const struct GNUNET_ATS_Information*atsi)
1041 {
1042   struct GNUNET_STREAM_Socket *socket = cls;
1043
1044   return GNUNET_OK;
1045 }
1046
1047 /*****************************/
1048 /* Server's Message Handlers */
1049 /*****************************/
1050
1051 /**
1052  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1053  *
1054  * @param cls the closure
1055  * @param tunnel connection to the other end
1056  * @param tunnel_ctx the socket
1057  * @param sender who sent the message
1058  * @param message the actual message
1059  * @param atsi performance data for the connection
1060  * @return GNUNET_OK to keep the connection open,
1061  *         GNUNET_SYSERR to close it (signal serious error)
1062  */
1063 static int
1064 server_handle_data (void *cls,
1065                     struct GNUNET_MESH_Tunnel *tunnel,
1066                     void **tunnel_ctx,
1067                     const struct GNUNET_PeerIdentity *sender,
1068                     const struct GNUNET_MessageHeader *message,
1069                     const struct GNUNET_ATS_Information*atsi)
1070 {
1071   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1072
1073   return GNUNET_OK;
1074 }
1075
1076
1077 /**
1078  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1079  *
1080  * @param cls the closure
1081  * @param tunnel connection to the other end
1082  * @param tunnel_ctx the socket
1083  * @param sender who sent the message
1084  * @param message the actual message
1085  * @param atsi performance data for the connection
1086  * @return GNUNET_OK to keep the connection open,
1087  *         GNUNET_SYSERR to close it (signal serious error)
1088  */
1089 static int
1090 server_handle_hello (void *cls,
1091                      struct GNUNET_MESH_Tunnel *tunnel,
1092                      void **tunnel_ctx,
1093                      const struct GNUNET_PeerIdentity *sender,
1094                      const struct GNUNET_MessageHeader *message,
1095                      const struct GNUNET_ATS_Information*atsi)
1096 {
1097   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1098   struct GNUNET_STREAM_HelloAckMessage *reply;
1099
1100   GNUNET_assert (socket->tunnel == tunnel);
1101   if (STATE_INIT == socket->state)
1102     {
1103       /* Get the random sequence number */
1104       socket->write_sequence_number = 
1105         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1106       reply = 
1107         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1108       reply->header.header.size = 
1109         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1110       reply->header.header.type = 
1111         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1112       reply->sequence_number = htonl (socket->write_sequence_number);
1113       queue_message (socket, 
1114                      &reply->header,
1115                      &set_state_hello_wait, 
1116                      NULL);
1117     }
1118   else
1119     {
1120       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121                   "Client sent HELLO when in state %d\n", socket->state);
1122       /* FIXME: Send RESET? */
1123       
1124     }
1125   return GNUNET_OK;
1126 }
1127
1128
1129 /**
1130  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1131  *
1132  * @param cls the closure
1133  * @param tunnel connection to the other end
1134  * @param tunnel_ctx the socket
1135  * @param sender who sent the message
1136  * @param message the actual message
1137  * @param atsi performance data for the connection
1138  * @return GNUNET_OK to keep the connection open,
1139  *         GNUNET_SYSERR to close it (signal serious error)
1140  */
1141 static int
1142 server_handle_hello_ack (void *cls,
1143                          struct GNUNET_MESH_Tunnel *tunnel,
1144                          void **tunnel_ctx,
1145                          const struct GNUNET_PeerIdentity *sender,
1146                          const struct GNUNET_MessageHeader *message,
1147                          const struct GNUNET_ATS_Information*atsi)
1148 {
1149   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1150   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1151
1152   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1153   GNUNET_assert (socket->tunnel == tunnel);
1154   if (STATE_HELLO_WAIT == socket->state)
1155     {
1156       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1157       socket->receive_window_available = 
1158         ntohl (ack_message->receive_window_size);
1159       socket->state = STATE_ESTABLISHED;
1160     }
1161   else
1162     {
1163       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1165       /* FIXME: Send RESET? */
1166       
1167     }
1168   return GNUNET_OK;
1169 }
1170
1171
1172 /**
1173  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1174  *
1175  * @param cls the closure
1176  * @param tunnel connection to the other end
1177  * @param tunnel_ctx the socket
1178  * @param sender who sent the message
1179  * @param message the actual message
1180  * @param atsi performance data for the connection
1181  * @return GNUNET_OK to keep the connection open,
1182  *         GNUNET_SYSERR to close it (signal serious error)
1183  */
1184 static int
1185 server_handle_reset (void *cls,
1186                      struct GNUNET_MESH_Tunnel *tunnel,
1187                      void **tunnel_ctx,
1188                      const struct GNUNET_PeerIdentity *sender,
1189                      const struct GNUNET_MessageHeader *message,
1190                      const struct GNUNET_ATS_Information*atsi)
1191 {
1192   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1193
1194   return GNUNET_OK;
1195 }
1196
1197
1198 /**
1199  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1200  *
1201  * @param cls the closure
1202  * @param tunnel connection to the other end
1203  * @param tunnel_ctx the socket
1204  * @param sender who sent the message
1205  * @param message the actual message
1206  * @param atsi performance data for the connection
1207  * @return GNUNET_OK to keep the connection open,
1208  *         GNUNET_SYSERR to close it (signal serious error)
1209  */
1210 static int
1211 server_handle_transmit_close (void *cls,
1212                               struct GNUNET_MESH_Tunnel *tunnel,
1213                               void **tunnel_ctx,
1214                               const struct GNUNET_PeerIdentity *sender,
1215                               const struct GNUNET_MessageHeader *message,
1216                               const struct GNUNET_ATS_Information*atsi)
1217 {
1218   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1219
1220   return handle_transmit_close (socket,
1221                                 tunnel,
1222                                 sender,
1223                                 (struct GNUNET_STREAM_MessageHeader *)message,
1224                                 atsi);
1225 }
1226
1227
1228 /**
1229  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1230  *
1231  * @param cls the closure
1232  * @param tunnel connection to the other end
1233  * @param tunnel_ctx the socket
1234  * @param sender who sent the message
1235  * @param message the actual message
1236  * @param atsi performance data for the connection
1237  * @return GNUNET_OK to keep the connection open,
1238  *         GNUNET_SYSERR to close it (signal serious error)
1239  */
1240 static int
1241 server_handle_transmit_close_ack (void *cls,
1242                                   struct GNUNET_MESH_Tunnel *tunnel,
1243                                   void **tunnel_ctx,
1244                                   const struct GNUNET_PeerIdentity *sender,
1245                                   const struct GNUNET_MessageHeader *message,
1246                                   const struct GNUNET_ATS_Information*atsi)
1247 {
1248   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1249
1250   return GNUNET_OK;
1251 }
1252
1253
1254 /**
1255  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1256  *
1257  * @param cls the closure
1258  * @param tunnel connection to the other end
1259  * @param tunnel_ctx the socket
1260  * @param sender who sent the message
1261  * @param message the actual message
1262  * @param atsi performance data for the connection
1263  * @return GNUNET_OK to keep the connection open,
1264  *         GNUNET_SYSERR to close it (signal serious error)
1265  */
1266 static int
1267 server_handle_receive_close (void *cls,
1268                              struct GNUNET_MESH_Tunnel *tunnel,
1269                              void **tunnel_ctx,
1270                              const struct GNUNET_PeerIdentity *sender,
1271                              const struct GNUNET_MessageHeader *message,
1272                              const struct GNUNET_ATS_Information*atsi)
1273 {
1274   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1275
1276   return GNUNET_OK;
1277 }
1278
1279
1280 /**
1281  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1282  *
1283  * @param cls the closure
1284  * @param tunnel connection to the other end
1285  * @param tunnel_ctx the socket
1286  * @param sender who sent the message
1287  * @param message the actual message
1288  * @param atsi performance data for the connection
1289  * @return GNUNET_OK to keep the connection open,
1290  *         GNUNET_SYSERR to close it (signal serious error)
1291  */
1292 static int
1293 server_handle_receive_close_ack (void *cls,
1294                                  struct GNUNET_MESH_Tunnel *tunnel,
1295                                  void **tunnel_ctx,
1296                                  const struct GNUNET_PeerIdentity *sender,
1297                                  const struct GNUNET_MessageHeader *message,
1298                                  const struct GNUNET_ATS_Information*atsi)
1299 {
1300   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1301
1302   return GNUNET_OK;
1303 }
1304
1305
1306 /**
1307  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1308  *
1309  * @param cls the closure
1310  * @param tunnel connection to the other end
1311  * @param tunnel_ctx the socket
1312  * @param sender who sent the message
1313  * @param message the actual message
1314  * @param atsi performance data for the connection
1315  * @return GNUNET_OK to keep the connection open,
1316  *         GNUNET_SYSERR to close it (signal serious error)
1317  */
1318 static int
1319 server_handle_close (void *cls,
1320                      struct GNUNET_MESH_Tunnel *tunnel,
1321                      void **tunnel_ctx,
1322                      const struct GNUNET_PeerIdentity *sender,
1323                      const struct GNUNET_MessageHeader *message,
1324                      const struct GNUNET_ATS_Information*atsi)
1325 {
1326   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1327
1328   return GNUNET_OK;
1329 }
1330
1331
1332 /**
1333  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1334  *
1335  * @param cls the closure
1336  * @param tunnel connection to the other end
1337  * @param tunnel_ctx the socket
1338  * @param sender who sent the message
1339  * @param message the actual message
1340  * @param atsi performance data for the connection
1341  * @return GNUNET_OK to keep the connection open,
1342  *         GNUNET_SYSERR to close it (signal serious error)
1343  */
1344 static int
1345 server_handle_close_ack (void *cls,
1346                          struct GNUNET_MESH_Tunnel *tunnel,
1347                          void **tunnel_ctx,
1348                          const struct GNUNET_PeerIdentity *sender,
1349                          const struct GNUNET_MessageHeader *message,
1350                          const struct GNUNET_ATS_Information*atsi)
1351 {
1352   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1353
1354   return GNUNET_OK;
1355 }
1356
1357
1358 /**
1359  * Message Handler for mesh
1360  *
1361  * @param socket the socket through which the ack was received
1362  * @param tunnel connection to the other end
1363  * @param sender who sent the message
1364  * @param ack the acknowledgment message
1365  * @param atsi performance data for the connection
1366  * @return GNUNET_OK to keep the connection open,
1367  *         GNUNET_SYSERR to close it (signal serious error)
1368  */
1369 static int
1370 handle_ack (struct GNUNET_STREAM_Socket *socket,
1371             struct GNUNET_MESH_Tunnel *tunnel,
1372             const struct GNUNET_PeerIdentity *sender,
1373             const struct GNUNET_STREAM_AckMessage *ack,
1374             const struct GNUNET_ATS_Information*atsi)
1375 {
1376   switch (socket->state)
1377     {
1378     case (STATE_ESTABLISHED):
1379       if (NULL == socket->write_handle)
1380         {
1381           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1382                       "Received DATA ACK when write_handle is NULL\n");
1383           return GNUNET_OK;
1384         }
1385
1386       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1387       socket->write_handle->receive_window_available = 
1388         ntohl (ack->receive_window_remaining);
1389       write_data (socket);
1390       break;
1391     default:
1392       break;
1393     }
1394   return GNUNET_OK;
1395 }
1396
1397
1398 /**
1399  * Message Handler for mesh
1400  *
1401  * @param cls the 'struct GNUNET_STREAM_Socket'
1402  * @param tunnel connection to the other end
1403  * @param tunnel_ctx unused
1404  * @param sender who sent the message
1405  * @param message the actual message
1406  * @param atsi performance data for the connection
1407  * @return GNUNET_OK to keep the connection open,
1408  *         GNUNET_SYSERR to close it (signal serious error)
1409  */
1410 static int
1411 client_handle_ack (void *cls,
1412                    struct GNUNET_MESH_Tunnel *tunnel,
1413                    void **tunnel_ctx,
1414                    const struct GNUNET_PeerIdentity *sender,
1415                    const struct GNUNET_MessageHeader *message,
1416                    const struct GNUNET_ATS_Information*atsi)
1417 {
1418   struct GNUNET_STREAM_Socket *socket = cls;
1419   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1420  
1421   return handle_ack (socket, tunnel, sender, ack, atsi);
1422 }
1423
1424
1425 /**
1426  * Message Handler for mesh
1427  *
1428  * @param cls the server's listen socket
1429  * @param tunnel connection to the other end
1430  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1431  * @param sender who sent the message
1432  * @param message the actual message
1433  * @param atsi performance data for the connection
1434  * @return GNUNET_OK to keep the connection open,
1435  *         GNUNET_SYSERR to close it (signal serious error)
1436  */
1437 static int
1438 server_handle_ack (void *cls,
1439                    struct GNUNET_MESH_Tunnel *tunnel,
1440                    void **tunnel_ctx,
1441                    const struct GNUNET_PeerIdentity *sender,
1442                    const struct GNUNET_MessageHeader *message,
1443                    const struct GNUNET_ATS_Information*atsi)
1444 {
1445   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1446   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1447  
1448   return handle_ack (socket, tunnel, sender, ack, atsi);
1449 }
1450
1451
1452 /**
1453  * For client message handlers, the stream socket is in the
1454  * closure argument.
1455  */
1456 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1457   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1458   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1459    sizeof (struct GNUNET_STREAM_AckMessage) },
1460   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1461    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1462   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1463    sizeof (struct GNUNET_STREAM_MessageHeader)},
1464   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1465    sizeof (struct GNUNET_STREAM_MessageHeader)},
1466   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1467    sizeof (struct GNUNET_STREAM_MessageHeader)},
1468   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1469    sizeof (struct GNUNET_STREAM_MessageHeader)},
1470   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1471    sizeof (struct GNUNET_STREAM_MessageHeader)},
1472   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1473    sizeof (struct GNUNET_STREAM_MessageHeader)},
1474   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1475    sizeof (struct GNUNET_STREAM_MessageHeader)},
1476   {NULL, 0, 0}
1477 };
1478
1479
1480 /**
1481  * For server message handlers, the stream socket is in the
1482  * tunnel context, and the listen socket in the closure argument.
1483  */
1484 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1485   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1486   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1487    sizeof (struct GNUNET_STREAM_AckMessage) },
1488   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1489    sizeof (struct GNUNET_STREAM_MessageHeader)},
1490   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1491    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1492   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1493    sizeof (struct GNUNET_STREAM_MessageHeader)},
1494   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1495    sizeof (struct GNUNET_STREAM_MessageHeader)},
1496   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1497    sizeof (struct GNUNET_STREAM_MessageHeader)},
1498   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1499    sizeof (struct GNUNET_STREAM_MessageHeader)},
1500   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1501    sizeof (struct GNUNET_STREAM_MessageHeader)},
1502   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1503    sizeof (struct GNUNET_STREAM_MessageHeader)},
1504   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1505    sizeof (struct GNUNET_STREAM_MessageHeader)},
1506   {NULL, 0, 0}
1507 };
1508
1509
1510 /**
1511  * Function called when our target peer is connected to our tunnel
1512  *
1513  * @param cls the socket for which this tunnel is created
1514  * @param peer the peer identity of the target
1515  * @param atsi performance data for the connection
1516  */
1517 static void
1518 mesh_peer_connect_callback (void *cls,
1519                             const struct GNUNET_PeerIdentity *peer,
1520                             const struct GNUNET_ATS_Information * atsi)
1521 {
1522   struct GNUNET_STREAM_Socket *socket = cls;
1523   struct GNUNET_STREAM_MessageHeader *message;
1524
1525   if (0 != memcmp (&socket->other_peer, 
1526                    peer, 
1527                    sizeof (struct GNUNET_PeerIdentity)))
1528     {
1529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530                   "A peer (%s) which is not our target has connected to our tunnel", 
1531                   GNUNET_i2s (peer));
1532       return;
1533     }
1534   
1535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536               "Target peer %s connected\n", GNUNET_i2s (peer));
1537   
1538   /* Set state to INIT */
1539   socket->state = STATE_INIT;
1540
1541   /* Send HELLO message */
1542   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1543   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1544   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1545   queue_message (socket,
1546                  message,
1547                  &set_state_hello_wait,
1548                  NULL);
1549
1550   /* Call open callback */
1551   if (NULL == socket->open_cb)
1552     {
1553       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1554                   "STREAM_open callback is NULL\n");
1555     }
1556   else
1557     {
1558       socket->open_cb (socket->open_cls, socket);
1559     }
1560 }
1561
1562
1563 /**
1564  * Function called when our target peer is disconnected from our tunnel
1565  *
1566  * @param cls the socket associated which this tunnel
1567  * @param peer the peer identity of the target
1568  */
1569 static void
1570 mesh_peer_disconnect_callback (void *cls,
1571                                const struct GNUNET_PeerIdentity *peer)
1572 {
1573
1574 }
1575
1576
1577 /*****************/
1578 /* API functions */
1579 /*****************/
1580
1581
1582 /**
1583  * Tries to open a stream to the target peer
1584  *
1585  * @param cfg configuration to use
1586  * @param target the target peer to which the stream has to be opened
1587  * @param app_port the application port number which uniquely identifies this
1588  *            stream
1589  * @param open_cb this function will be called after stream has be established 
1590  * @param open_cb_cls the closure for open_cb
1591  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1592  * @return if successful it returns the stream socket; NULL if stream cannot be
1593  *         opened 
1594  */
1595 struct GNUNET_STREAM_Socket *
1596 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1597                     const struct GNUNET_PeerIdentity *target,
1598                     GNUNET_MESH_ApplicationType app_port,
1599                     GNUNET_STREAM_OpenCallback open_cb,
1600                     void *open_cb_cls,
1601                     ...)
1602 {
1603   struct GNUNET_STREAM_Socket *socket;
1604   enum GNUNET_STREAM_Option option;
1605   va_list vargs;                /* Variable arguments */
1606
1607   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1608   socket->other_peer = *target;
1609   socket->open_cb = open_cb;
1610   socket->open_cls = open_cb_cls;
1611
1612   /* Set defaults */
1613   socket->retransmit_timeout = 
1614     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1615
1616   va_start (vargs, open_cb_cls); /* Parse variable args */
1617   do {
1618     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1619     switch (option)
1620       {
1621       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1622         /* Expect struct GNUNET_TIME_Relative */
1623         socket->retransmit_timeout = va_arg (vargs,
1624                                              struct GNUNET_TIME_Relative);
1625         break;
1626       case GNUNET_STREAM_OPTION_END:
1627         break;
1628       }
1629   } while (GNUNET_STREAM_OPTION_END != option);
1630   va_end (vargs);               /* End of variable args parsing */
1631
1632   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1633                                       1,  /* QUEUE size as parameter? */
1634                                       socket, /* cls */
1635                                       NULL, /* No inbound tunnel handler */
1636                                       NULL, /* No inbound tunnel cleaner */
1637                                       client_message_handlers,
1638                                       NULL); /* We don't get inbound tunnels */
1639   // FIXME: if (NULL == socket->mesh) ...
1640
1641   /* Now create the mesh tunnel to target */
1642   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1643                                               NULL, /* Tunnel context */
1644                                               &mesh_peer_connect_callback,
1645                                               &mesh_peer_disconnect_callback,
1646                                               socket);
1647   // FIXME: if (NULL == socket->tunnel) ...
1648
1649   return socket;
1650 }
1651
1652
1653 /**
1654  * Closes the stream
1655  *
1656  * @param socket the stream socket
1657  */
1658 void
1659 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1660 {
1661   struct MessageQueue *head;
1662
1663   /* Clear Transmit handles */
1664   if (NULL != socket->transmit_handle)
1665     {
1666       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1667       socket->transmit_handle = NULL;
1668     }
1669
1670   /* Clear existing message queue */
1671   while (NULL != (head = socket->queue_head)) {
1672     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1673                                  socket->queue_tail,
1674                                  head);
1675     GNUNET_free (head->message);
1676     GNUNET_free (head);
1677   }
1678
1679   /* Close associated tunnel */
1680   if (NULL != socket->tunnel)
1681     {
1682       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1683       socket->tunnel = NULL;
1684     }
1685
1686   /* Close mesh connection */
1687   if (NULL != socket->mesh)
1688     {
1689       GNUNET_MESH_disconnect (socket->mesh);
1690       socket->mesh = NULL;
1691     }
1692   
1693   /* Release receive buffer */
1694   if (NULL != socket->receive_buffer)
1695     {
1696       GNUNET_free (socket->receive_buffer);
1697     }
1698
1699   GNUNET_free (socket);
1700 }
1701
1702
1703 /**
1704  * Method called whenever a peer creates a tunnel to us
1705  *
1706  * @param cls closure
1707  * @param tunnel new handle to the tunnel
1708  * @param initiator peer that started the tunnel
1709  * @param atsi performance information for the tunnel
1710  * @return initial tunnel context for the tunnel
1711  *         (can be NULL -- that's not an error)
1712  */
1713 static void *
1714 new_tunnel_notify (void *cls,
1715                    struct GNUNET_MESH_Tunnel *tunnel,
1716                    const struct GNUNET_PeerIdentity *initiator,
1717                    const struct GNUNET_ATS_Information *atsi)
1718 {
1719   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1720   struct GNUNET_STREAM_Socket *socket;
1721
1722   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1723   socket->tunnel = tunnel;
1724   socket->session_id = 0;       /* FIXME */
1725   socket->other_peer = *initiator;
1726   socket->state = STATE_INIT;
1727
1728   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1729                                            socket,
1730                                            &socket->other_peer))
1731     {
1732       socket->state = STATE_CLOSED;
1733       /* FIXME: Send CLOSE message and then free */
1734       GNUNET_free (socket);
1735       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1736     }
1737   return socket;
1738 }
1739
1740
1741 /**
1742  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1743  * any associated state.  This function is NOT called if the client has
1744  * explicitly asked for the tunnel to be destroyed using
1745  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1746  * the tunnel.
1747  *
1748  * @param cls closure (set from GNUNET_MESH_connect)
1749  * @param tunnel connection to the other end (henceforth invalid)
1750  * @param tunnel_ctx place where local state associated
1751  *                   with the tunnel is stored
1752  */
1753 static void 
1754 tunnel_cleaner (void *cls,
1755                 const struct GNUNET_MESH_Tunnel *tunnel,
1756                 void *tunnel_ctx)
1757 {
1758   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1759   
1760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1761               "Peer %s has terminated connection abruptly\n",
1762               GNUNET_i2s (&socket->other_peer));
1763
1764   socket->status = GNUNET_STREAM_SHUTDOWN;
1765   /* Clear Transmit handles */
1766   if (NULL != socket->transmit_handle)
1767     {
1768       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1769       socket->transmit_handle = NULL;
1770     }
1771   socket->tunnel = NULL;
1772 }
1773
1774
1775 /**
1776  * Listens for stream connections for a specific application ports
1777  *
1778  * @param cfg the configuration to use
1779  * @param app_port the application port for which new streams will be accepted
1780  * @param listen_cb this function will be called when a peer tries to establish
1781  *            a stream with us
1782  * @param listen_cb_cls closure for listen_cb
1783  * @return listen socket, NULL for any error
1784  */
1785 struct GNUNET_STREAM_ListenSocket *
1786 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1787                       GNUNET_MESH_ApplicationType app_port,
1788                       GNUNET_STREAM_ListenCallback listen_cb,
1789                       void *listen_cb_cls)
1790 {
1791   /* FIXME: Add variable args for passing configration options? */
1792   struct GNUNET_STREAM_ListenSocket *lsocket;
1793   GNUNET_MESH_ApplicationType app_types[2];
1794
1795   app_types[0] = app_port;
1796   app_types[1] = 0;
1797   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1798   lsocket->port = app_port;
1799   lsocket->listen_cb = listen_cb;
1800   lsocket->listen_cb_cls = listen_cb_cls;
1801   lsocket->mesh = GNUNET_MESH_connect (cfg,
1802                                        10, /* FIXME: QUEUE size as parameter? */
1803                                        lsocket, /* Closure */
1804                                        &new_tunnel_notify,
1805                                        &tunnel_cleaner,
1806                                        server_message_handlers,
1807                                        app_types);
1808   return lsocket;
1809 }
1810
1811
1812 /**
1813  * Closes the listen socket
1814  *
1815  * @param lsocket the listen socket
1816  */
1817 void
1818 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1819 {
1820   /* Close MESH connection */
1821   GNUNET_MESH_disconnect (lsocket->mesh);
1822   
1823   GNUNET_free (lsocket);
1824 }
1825
1826
1827 /**
1828  * Tries to write the given data to the stream
1829  *
1830  * @param socket the socket representing a stream
1831  * @param data the data buffer from where the data is written into the stream
1832  * @param size the number of bytes to be written from the data buffer
1833  * @param timeout the timeout period
1834  * @param write_cont the function to call upon writing some bytes into the stream
1835  * @param write_cont_cls the closure
1836  * @return handle to cancel the operation
1837  */
1838 struct GNUNET_STREAM_IOHandle *
1839 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1840                      const void *data,
1841                      size_t size,
1842                      struct GNUNET_TIME_Relative timeout,
1843                      GNUNET_STREAM_CompletionContinuation write_cont,
1844                      void *write_cont_cls)
1845 {
1846   unsigned int num_needed_packets;
1847   unsigned int packet;
1848   struct GNUNET_STREAM_IOHandle *io_handle;
1849   size_t packet_size;
1850   struct GNUNET_STREAM_DataMessage *data_msg;
1851   const void *sweep;
1852
1853   /* There is already a write request pending */
1854   if (NULL != socket->write_handle) return NULL;
1855   if (!(STATE_ESTABLISHED == socket->state 
1856         || STATE_RECEIVE_CLOSE_WAIT == socket->state
1857         || STATE_RECEIVE_CLOSED == socket->state))
1858     {
1859       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1860                   "Attempting to write on a closed (OR) not-yet-established"
1861                   "stream\n"); 
1862       return NULL;
1863     }
1864       
1865   num_needed_packets = ceil (size / max_payload_size);
1866   if (64 < num_needed_packets) 
1867     {
1868       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1869                   "Given buffer cannot be accommodated in 64 packets\n");
1870       num_needed_packets = 64;
1871     }
1872
1873   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
1874   io_handle->receive_window_available = socket->receive_window_available;
1875   sweep = data;
1876   /* Divide the given buffer into packets for sending */
1877   for (packet=0; packet < num_needed_packets; packet++)
1878     {
1879       if ((packet + 1) * max_payload_size < size) 
1880         {
1881           packet_size = MAX_PACKET_SIZE;
1882         }
1883       else 
1884         {
1885           packet_size = size - packet * max_payload_size
1886             + sizeof (struct GNUNET_STREAM_DataMessage);
1887         }
1888       io_handle->messages[packet] = GNUNET_malloc (packet_size);
1889       io_handle->messages[packet]->header.header.size = htons (packet_size);
1890       io_handle->messages[packet]->header.header.type =
1891         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
1892       io_handle->messages[packet]->sequence_number =
1893         htons (socket->write_sequence_number++);
1894       data_msg = io_handle->messages[packet];
1895       memcpy (&data_msg[1],
1896               sweep,
1897               packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
1898       sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
1899     }
1900
1901   write_data (socket);
1902
1903   return io_handle;
1904 }