65836ce1c52c52458ce199b644d4ad3a7270bb8b
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param cls the closure from queue_message
110  * @param socket the socket the written message was bound to
111  */
112 typedef void (*SendFinishCallback) (void *cls,
113                                     struct GNUNET_STREAM_Socket *socket);
114
115
116 /**
117  * The send message queue
118  */
119 struct MessageQueue
120 {
121   /**
122    * The message
123    */
124   struct GNUNET_STREAM_MessageHeader *message;
125
126   /**
127    * Callback to be called when the message is sent
128    */
129   SendFinishCallback finish_cb;
130
131   /**
132    * The closure for finish_cb
133    */
134   void *finish_cb_cls;
135
136   /**
137    * The next message in queue. Should be NULL in the last message
138    */
139   struct MessageQueue *next;
140
141   /**
142    * The next message in queue. Should be NULL in the last message
143    */
144   struct MessageQueue *prev;
145 };
146
147
148 /**
149  * The STREAM Socket Handler
150  */
151 struct GNUNET_STREAM_Socket
152 {
153
154   /**
155    * The peer identity of the peer at the other end of the stream
156    */
157   struct GNUNET_PeerIdentity other_peer;
158
159   /**
160    * Retransmission timeout
161    */
162   struct GNUNET_TIME_Relative retransmit_timeout;
163
164   /**
165    * The Acknowledgement Bitmap
166    */
167   GNUNET_STREAM_AckBitmap ack_bitmap;
168
169   /**
170    * Time when the Acknowledgement was queued
171    */
172   struct GNUNET_TIME_Absolute ack_time_registered;
173
174   /**
175    * Queued Acknowledgement deadline
176    */
177   struct GNUNET_TIME_Relative ack_time_deadline;
178
179   /**
180    * The task for sending timely Acks
181    */
182   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
183
184   /**
185    * The mesh handle
186    */
187   struct GNUNET_MESH_Handle *mesh;
188
189   /**
190    * The mesh tunnel handle
191    */
192   struct GNUNET_MESH_Tunnel *tunnel;
193
194   /**
195    * Stream open closure
196    */
197   void *open_cls;
198
199   /**
200    * Stream open callback
201    */
202   GNUNET_STREAM_OpenCallback open_cb;
203
204   /**
205    * The current transmit handle (if a pending transmit request exists)
206    */
207   struct GNUNET_MESH_TransmitHandle *transmit_handle;
208
209   /**
210    * The current message associated with the transmit handle
211    */
212   struct MessageQueue *queue_head;
213
214   /**
215    * The queue tail, should always point to the last message in queue
216    */
217   struct MessageQueue *queue_tail;
218
219   /**
220    * The write IO_handle associated with this socket
221    */
222   struct GNUNET_STREAM_IOHandle *write_handle;
223
224   /**
225    * The read IO_handle associated with this socket
226    */
227   struct GNUNET_STREAM_IOHandle *read_handle;
228
229   /**
230    * Buffer for storing received messages
231    */
232   void *receive_buffer;
233
234   /**
235    * The state of the protocol associated with this socket
236    */
237   enum State state;
238
239   /**
240    * The status of the socket
241    */
242   enum GNUNET_STREAM_Status status;
243
244   /**
245    * The number of previous timeouts; FIXME: currently not used
246    */
247   unsigned int retries;
248
249   /**
250    * The session id associated with this stream connection
251    * FIXME: Not used currently, may be removed
252    */
253   uint32_t session_id;
254
255   /**
256    * Write sequence number. Set to random when sending HELLO(client) and
257    * HELLO_ACK(server) 
258    */
259   uint32_t write_sequence_number;
260
261   /**
262    * Read sequence number. This number's value is determined during handshake
263    */
264   uint32_t read_sequence_number;
265
266   /**
267    * receiver's available buffer after the last acknowledged packet
268    */
269   uint32_t receive_window_available;
270 };
271
272
273 /**
274  * A socket for listening
275  */
276 struct GNUNET_STREAM_ListenSocket
277 {
278
279   /**
280    * The mesh handle
281    */
282   struct GNUNET_MESH_Handle *mesh;
283
284   /**
285    * The callback function which is called after successful opening socket
286    */
287   GNUNET_STREAM_ListenCallback listen_cb;
288
289   /**
290    * The call back closure
291    */
292   void *listen_cb_cls;
293
294   /**
295    * The service port
296    */
297   GNUNET_MESH_ApplicationType port;
298 };
299
300
301
302
303 /**
304  * The IO Handle
305  */
306 struct GNUNET_STREAM_IOHandle
307 {
308   /**
309    * The packet_buffers associated with this Handle
310    */
311   struct GNUNET_STREAM_DataMessage *messages[64];
312
313   /**
314    * The bitmap of this IOHandle; Corresponding bit for a message is set when
315    * it has been acknowledged by the receiver
316    */
317   GNUNET_STREAM_AckBitmap ack_bitmap;
318
319   /**
320    * receiver's available buffer
321    */
322   uint32_t receive_window_available;
323
324   /**
325    * Number of packets sent before waiting for an ack
326    *
327    * FIXME: Do we need this?
328    */
329   unsigned int sent_packets;
330 };
331
332
333 /**
334  * Default value in seconds for various timeouts
335  */
336 static unsigned int default_timeout = 300;
337
338
339 /**
340  * Callback function for sending hello message
341  *
342  * @param cls closure the socket
343  * @param size number of bytes available in buf
344  * @param buf where the callee should write the message
345  * @return number of bytes written to buf
346  */
347 static size_t
348 send_message_notify (void *cls, size_t size, void *buf)
349 {
350   struct GNUNET_STREAM_Socket *socket = cls;
351   struct MessageQueue *head;
352   size_t ret;
353
354   socket->transmit_handle = NULL; /* Remove the transmit handle */
355   head = socket->queue_head;
356   if (NULL == head)
357     return 0; /* just to be safe */
358   if (0 == size)                /* request timed out */
359     {
360       socket->retries++;
361       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
362                   "Message sending timed out. Retry %d \n",
363                   socket->retries);
364       socket->transmit_handle = 
365         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
366                                            0, /* Corking */
367                                            1, /* Priority */
368                                            /* FIXME: exponential backoff */
369                                            socket->retransmit_timeout,
370                                            &socket->other_peer,
371                                            ntohs (head->message->header.size),
372                                            &send_message_notify,
373                                            socket);
374       return 0;
375     }
376
377   ret = ntohs (head->message->header.size);
378   GNUNET_assert (size >= ret);
379   memcpy (buf, head->message, ret);
380   if (NULL != head->finish_cb)
381     {
382       head->finish_cb (socket, head->finish_cb_cls);
383     }
384   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
385                                socket->queue_tail,
386                                head);
387   GNUNET_free (head->message);
388   GNUNET_free (head);
389   head = socket->queue_head;
390   if (NULL != head)    /* more pending messages to send */
391     {
392       socket->retries = 0;
393       socket->transmit_handle = 
394         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
395                                            0, /* Corking */
396                                            1, /* Priority */
397                                            /* FIXME: exponential backoff */
398                                            socket->retransmit_timeout,
399                                            &socket->other_peer,
400                                            ntohs (head->message->header.size),
401                                            &send_message_notify,
402                                            socket);
403     }
404   return ret;
405 }
406
407
408 /**
409  * Queues a message for sending using the mesh connection of a socket
410  *
411  * @param socket the socket whose mesh connection is used
412  * @param message the message to be sent
413  * @param finish_cb the callback to be called when the message is sent
414  * @param finish_cb_cls the closure for the callback
415  */
416 static void
417 queue_message (struct GNUNET_STREAM_Socket *socket,
418                struct GNUNET_STREAM_MessageHeader *message,
419                SendFinishCallback finish_cb,
420                void *finish_cb_cls)
421 {
422   struct MessageQueue *queue_entity;
423
424   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
425   queue_entity->message = message;
426   queue_entity->finish_cb = finish_cb;
427   queue_entity->finish_cb_cls = finish_cb_cls;
428   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
429                                     socket->queue_tail,
430                                     queue_entity);
431   if (NULL == socket->transmit_handle)
432   {
433     socket->retries = 0;
434     socket->transmit_handle = 
435       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
436                                          0, /* Corking */
437                                          1, /* Priority */
438                                          socket->retransmit_timeout,
439                                          &socket->other_peer,
440                                          ntohs (message->header.size),
441                                          &send_message_notify,
442                                          socket);
443   }
444 }
445
446
447 /**
448  * Callback function for sending ack message
449  *
450  * @param cls closure the ACK message created in ack_task
451  * @param size number of bytes available in buffer
452  * @param buf where the callee should write the message
453  * @return number of bytes written to buf
454  */
455 static size_t
456 send_ack_notify (void *cls, size_t size, void *buf)
457 {
458   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
459
460   if (0 == size)
461     {
462       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463                   "%s called with size 0\n", __func__);
464       return 0;
465     }
466   GNUNET_assert (ack_msg->header.header.size <= size);
467   
468   size = ack_msg->header.header.size;
469   memcpy (buf, ack_msg, size);
470   return size;
471 }
472
473
474 /**
475  * Task for sending ACK message
476  *
477  * @param cls the socket
478  * @param tc the Task context
479  */
480 static void
481 ack_task (void *cls,
482           const struct GNUNET_SCHEDULER_TaskContext *tc)
483 {
484   struct GNUNET_STREAM_Socket *socket = cls;
485   struct GNUNET_STREAM_AckMessage *ack_msg;
486
487   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
488     {
489       return;
490     }
491
492   socket->ack_task_id = 0;
493
494   /* Create the ACK Message */
495   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
496   ack_msg->header.header.size = htons (sizeof (struct 
497                                                GNUNET_STREAM_AckMessage));
498   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
499   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
500   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
501   ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
502
503   /* Request MESH for sending ACK */
504   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
505                                      0, /* Corking */
506                                      1, /* Priority */
507                                      socket->retransmit_timeout,
508                                      &socket->other_peer,
509                                      ntohs (ack_msg->header.header.size),
510                                      &send_ack_notify,
511                                      ack_msg);
512
513   
514 }
515
516
517 /**
518  * Function to modify a bit in GNUNET_STREAM_AckBitmap
519  *
520  * @param bitmap the bitmap to modify
521  * @param bit the bit number to modify
522  * @param value GNUNET_YES to on, GNUNET_NO to off
523  */
524 static void
525 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
526                       unsigned int bit, 
527                       int value)
528 {
529   GNUNET_assert (bit < 64);
530   if (GNUNET_YES == value)
531     *bitmap |= (1LL << bit);
532   else
533     *bitmap &= ~(1LL << bit);
534 }
535
536
537 /**
538  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
539  *
540  * @param bitmap address of the bitmap that has to be checked
541  * @param bit the bit number to check
542  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
543  */
544 static uint8_t
545 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
546                       unsigned int bit)
547 {
548   GNUNET_assert (bit < 64);
549   return 0 != (*bitmap & (1LL << bit));
550 }
551
552
553
554 /**
555  * Function called when Data Message is sent
556  *
557  * @param cls the io_handle corresponding to the Data Message
558  * @param socket the socket which was used
559  */
560 static void
561 write_data_finish_cb (void *cls,
562                       struct GNUNET_STREAM_Socket *socket)
563 {
564   struct GNUNET_STREAM_IOHandle *io_handle = cls;
565
566   io_handle->sent_packets++;
567 }
568
569
570 /**
571  * Writes data using the given socket. The amount of data written is limited by
572  * the receive_window_size
573  *
574  * @param socket the socket to use
575  */
576 static void 
577 write_data (struct GNUNET_STREAM_Socket *socket)
578 {
579   struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
580   unsigned int packet;
581   int ack_packet;
582
583   ack_packet = -1;
584   /* Find the last acknowledged packet */
585   for (packet=0; packet < 64; packet++)
586     {      
587       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
588                                               packet))
589         ack_packet = packet;        
590       else if (NULL == io_handle->messages[packet])
591         break;
592     }
593   /* Resend packets which weren't ack'ed */
594   for (packet=0; packet < ack_packet; packet++)
595     {
596       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
597                                              packet))
598         {
599           queue_message (socket,
600                          &io_handle->messages[packet]->header,
601                          NULL,
602                          NULL);
603         }
604     }
605   packet = ack_packet + 1;
606   /* Now send new packets if there is enough buffer space */
607   while ( (NULL != io_handle->messages[packet]) &&
608           (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
609     {
610       io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
611       queue_message (socket,
612                      &io_handle->messages[packet]->header,
613                      &write_data_finish_cb,
614                      io_handle);
615       packet++;
616     }
617 }
618
619
620 /**
621  * Handler for DATA messages; Same for both client and server
622  *
623  * @param socket the socket through which the ack was received
624  * @param tunnel connection to the other end
625  * @param sender who sent the message
626  * @param msg the data message
627  * @param atsi performance data for the connection
628  * @return GNUNET_OK to keep the connection open,
629  *         GNUNET_SYSERR to close it (signal serious error)
630  */
631 static int
632 handle_data (struct GNUNET_STREAM_Socket *socket,
633              struct GNUNET_MESH_Tunnel *tunnel,
634              const struct GNUNET_PeerIdentity *sender,
635              const struct GNUNET_STREAM_DataMessage *msg,
636              const struct GNUNET_ATS_Information*atsi)
637 {
638   uint16_t size;
639   const void *payload;
640
641   size = msg->header.header.size;
642   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
643     {
644       GNUNET_break_op (0);
645       return GNUNET_SYSERR;
646     }
647
648   switch (socket->state)
649     {
650     case STATE_ESTABLISHED:
651     case STATE_TRANSMIT_CLOSED:
652     case STATE_TRANSMIT_CLOSE_WAIT:
653       GNUNET_assert (NULL != socket->receive_buffer);
654       /* check if the message's sequence number is greater than the one we are
655          expecting */
656       if (ntohl (msg->sequence_number) - socket->read_sequence_number >= 64)
657         {/* We are receiving a retransmitted message */
658           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659                       "Message with sequence number %d retransmitted\n",
660                       ntohl (socket->read_sequence_number));
661           return GNUNET_YES;
662         }
663
664       /* Copy Data to buffer and send acknowledgement for this packet */
665       size -= sizeof (struct GNUNET_STREAM_DataMessage);
666       payload = &msg[1];
667       memcpy (socket->receive_buffer 
668               + (ntohl (msg->sequence_number) - socket->read_sequence_number)
669               * MAX_PACKET_SIZE,
670               payload,
671               size);
672       
673       /* Modify the ACK bitmap */
674       ackbitmap_modify_bit (&socket->ack_bitmap,
675                             ntohl (msg->sequence_number) -
676                             socket->read_sequence_number,
677                             GNUNET_YES);
678
679       /* Start ACK sending task if one is not already present */
680       if (0 == socket->ack_task_id)
681        {
682          socket->ack_task_id = 
683            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
684                                          (msg->ack_deadline),
685                                          &ack_task,
686                                          socket);
687        }
688       
689       break;
690
691     default:
692       /* FIXME: call statistics */
693       break;
694     }
695   return GNUNET_YES;
696 }
697
698 /**
699  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
700  *
701  * @param cls the socket (set from GNUNET_MESH_connect)
702  * @param tunnel connection to the other end
703  * @param tunnel_ctx place to store local state associated with the tunnel
704  * @param sender who sent the message
705  * @param message the actual message
706  * @param atsi performance data for the connection
707  * @return GNUNET_OK to keep the connection open,
708  *         GNUNET_SYSERR to close it (signal serious error)
709  */
710 static int
711 client_handle_data (void *cls,
712              struct GNUNET_MESH_Tunnel *tunnel,
713              void **tunnel_ctx,
714              const struct GNUNET_PeerIdentity *sender,
715              const struct GNUNET_MessageHeader *message,
716              const struct GNUNET_ATS_Information*atsi)
717 {
718   struct GNUNET_STREAM_Socket *socket = cls;
719
720   return handle_data (socket, 
721                       tunnel, 
722                       sender, 
723                       (const struct GNUNET_STREAM_DataMessage *) message, 
724                       atsi);
725 }
726
727
728 /**
729  * Callback to set state to ESTABLISHED
730  *
731  * @param cls the closure from queue_message FIXME: document
732  * @param socket the socket to requiring state change
733  */
734 static void
735 set_state_established (void *cls,
736                        struct GNUNET_STREAM_Socket *socket)
737 {
738   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
739   /* Initialize the receive buffer */
740   socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE);
741   socket->state = STATE_ESTABLISHED;
742 }
743
744
745 /**
746  * Callback to set state to HELLO_WAIT
747  *
748  * @param cls the closure from queue_message
749  * @param socket the socket to requiring state change
750  */
751 static void
752 set_state_hello_wait (void *cls,
753                       struct GNUNET_STREAM_Socket *socket)
754 {
755   GNUNET_assert (STATE_INIT == socket->state);
756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
757   socket->state = STATE_HELLO_WAIT;
758 }
759
760
761 /**
762  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
763  *
764  * @param cls the socket (set from GNUNET_MESH_connect)
765  * @param tunnel connection to the other end
766  * @param tunnel_ctx this is NULL
767  * @param sender who sent the message
768  * @param message the actual message
769  * @param atsi performance data for the connection
770  * @return GNUNET_OK to keep the connection open,
771  *         GNUNET_SYSERR to close it (signal serious error)
772  */
773 static int
774 client_handle_hello_ack (void *cls,
775                          struct GNUNET_MESH_Tunnel *tunnel,
776                          void **tunnel_ctx,
777                          const struct GNUNET_PeerIdentity *sender,
778                          const struct GNUNET_MessageHeader *message,
779                          const struct GNUNET_ATS_Information*atsi)
780 {
781   struct GNUNET_STREAM_Socket *socket = cls;
782   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
783   struct GNUNET_STREAM_HelloAckMessage *reply;
784
785   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
786   GNUNET_assert (socket->tunnel == tunnel);
787   switch (socket->state)
788   {
789   case STATE_HELLO_WAIT:
790       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
791       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
792       /* Get the random sequence number */
793       socket->write_sequence_number = 
794         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
795       reply = 
796         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
797       reply->header.header.size = 
798         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
799       reply->header.header.type = 
800         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
801       reply->sequence_number = htonl (socket->write_sequence_number);
802       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
803       queue_message (socket, 
804                      &reply->header, 
805                      &set_state_established, 
806                      NULL);      
807       return GNUNET_OK;
808   case STATE_ESTABLISHED:
809   case STATE_RECEIVE_CLOSE_WAIT:
810     // call statistics (# ACKs ignored++)
811     return GNUNET_OK;
812   case STATE_INIT:
813   default:
814     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815                 "Server sent HELLO_ACK when in state %d\n", socket->state);
816     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
817     return GNUNET_SYSERR;
818   }
819
820 }
821
822
823 /**
824  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
825  *
826  * @param cls the socket (set from GNUNET_MESH_connect)
827  * @param tunnel connection to the other end
828  * @param tunnel_ctx this is NULL
829  * @param sender who sent the message
830  * @param message the actual message
831  * @param atsi performance data for the connection
832  * @return GNUNET_OK to keep the connection open,
833  *         GNUNET_SYSERR to close it (signal serious error)
834  */
835 static int
836 client_handle_reset (void *cls,
837                      struct GNUNET_MESH_Tunnel *tunnel,
838                      void **tunnel_ctx,
839                      const struct GNUNET_PeerIdentity *sender,
840                      const struct GNUNET_MessageHeader *message,
841                      const struct GNUNET_ATS_Information*atsi)
842 {
843   struct GNUNET_STREAM_Socket *socket = cls;
844
845   return GNUNET_OK;
846 }
847
848
849 /**
850  * Common message handler for handling TRANSMIT_CLOSE messages
851  *
852  * @param socket the socket through which the ack was received
853  * @param tunnel connection to the other end
854  * @param sender who sent the message
855  * @param msg the transmit close message
856  * @param atsi performance data for the connection
857  * @return GNUNET_OK to keep the connection open,
858  *         GNUNET_SYSERR to close it (signal serious error)
859  */
860 static int
861 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
862                        struct GNUNET_MESH_Tunnel *tunnel,
863                        const struct GNUNET_PeerIdentity *sender,
864                        const struct GNUNET_STREAM_MessageHeader *msg,
865                        const struct GNUNET_ATS_Information*atsi)
866 {
867   struct GNUNET_STREAM_MessageHeader *reply;
868
869   switch (socket->state)
870     {
871     case STATE_ESTABLISHED:
872       socket->state = STATE_RECEIVE_CLOSED;
873
874       /* Send TRANSMIT_CLOSE_ACK */
875       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
876       reply->header.type = 
877         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
878       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
879       queue_message (socket, reply, NULL, NULL);
880       break;
881
882     default:
883       /* FIXME: Call statistics? */
884       break;
885     }
886   return GNUNET_YES;
887 }
888
889
890 /**
891  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
892  *
893  * @param cls the socket (set from GNUNET_MESH_connect)
894  * @param tunnel connection to the other end
895  * @param tunnel_ctx this is NULL
896  * @param sender who sent the message
897  * @param message the actual message
898  * @param atsi performance data for the connection
899  * @return GNUNET_OK to keep the connection open,
900  *         GNUNET_SYSERR to close it (signal serious error)
901  */
902 static int
903 client_handle_transmit_close (void *cls,
904                               struct GNUNET_MESH_Tunnel *tunnel,
905                               void **tunnel_ctx,
906                               const struct GNUNET_PeerIdentity *sender,
907                               const struct GNUNET_MessageHeader *message,
908                               const struct GNUNET_ATS_Information*atsi)
909 {
910   struct GNUNET_STREAM_Socket *socket = cls;
911   
912   return handle_transmit_close (socket,
913                                 tunnel,
914                                 sender,
915                                 (struct GNUNET_STREAM_MessageHeader *)message,
916                                 atsi);
917 }
918
919
920 /**
921  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
922  *
923  * @param cls the socket (set from GNUNET_MESH_connect)
924  * @param tunnel connection to the other end
925  * @param tunnel_ctx this is NULL
926  * @param sender who sent the message
927  * @param message the actual message
928  * @param atsi performance data for the connection
929  * @return GNUNET_OK to keep the connection open,
930  *         GNUNET_SYSERR to close it (signal serious error)
931  */
932 static int
933 client_handle_transmit_close_ack (void *cls,
934                                   struct GNUNET_MESH_Tunnel *tunnel,
935                                   void **tunnel_ctx,
936                                   const struct GNUNET_PeerIdentity *sender,
937                                   const struct GNUNET_MessageHeader *message,
938                                   const struct GNUNET_ATS_Information*atsi)
939 {
940   struct GNUNET_STREAM_Socket *socket = cls;
941
942   return GNUNET_OK;
943 }
944
945
946 /**
947  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
948  *
949  * @param cls the socket (set from GNUNET_MESH_connect)
950  * @param tunnel connection to the other end
951  * @param tunnel_ctx this is NULL
952  * @param sender who sent the message
953  * @param message the actual message
954  * @param atsi performance data for the connection
955  * @return GNUNET_OK to keep the connection open,
956  *         GNUNET_SYSERR to close it (signal serious error)
957  */
958 static int
959 client_handle_receive_close (void *cls,
960                              struct GNUNET_MESH_Tunnel *tunnel,
961                              void **tunnel_ctx,
962                              const struct GNUNET_PeerIdentity *sender,
963                              const struct GNUNET_MessageHeader *message,
964                              const struct GNUNET_ATS_Information*atsi)
965 {
966   struct GNUNET_STREAM_Socket *socket = cls;
967
968   return GNUNET_OK;
969 }
970
971
972 /**
973  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
974  *
975  * @param cls the socket (set from GNUNET_MESH_connect)
976  * @param tunnel connection to the other end
977  * @param tunnel_ctx this is NULL
978  * @param sender who sent the message
979  * @param message the actual message
980  * @param atsi performance data for the connection
981  * @return GNUNET_OK to keep the connection open,
982  *         GNUNET_SYSERR to close it (signal serious error)
983  */
984 static int
985 client_handle_receive_close_ack (void *cls,
986                                  struct GNUNET_MESH_Tunnel *tunnel,
987                                  void **tunnel_ctx,
988                                  const struct GNUNET_PeerIdentity *sender,
989                                  const struct GNUNET_MessageHeader *message,
990                                  const struct GNUNET_ATS_Information*atsi)
991 {
992   struct GNUNET_STREAM_Socket *socket = cls;
993
994   return GNUNET_OK;
995 }
996
997
998 /**
999  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1000  *
1001  * @param cls the socket (set from GNUNET_MESH_connect)
1002  * @param tunnel connection to the other end
1003  * @param tunnel_ctx this is NULL
1004  * @param sender who sent the message
1005  * @param message the actual message
1006  * @param atsi performance data for the connection
1007  * @return GNUNET_OK to keep the connection open,
1008  *         GNUNET_SYSERR to close it (signal serious error)
1009  */
1010 static int
1011 client_handle_close (void *cls,
1012                      struct GNUNET_MESH_Tunnel *tunnel,
1013                      void **tunnel_ctx,
1014                      const struct GNUNET_PeerIdentity *sender,
1015                      const struct GNUNET_MessageHeader *message,
1016                      const struct GNUNET_ATS_Information*atsi)
1017 {
1018   struct GNUNET_STREAM_Socket *socket = cls;
1019
1020   return GNUNET_OK;
1021 }
1022
1023
1024 /**
1025  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1026  *
1027  * @param cls the socket (set from GNUNET_MESH_connect)
1028  * @param tunnel connection to the other end
1029  * @param tunnel_ctx this is NULL
1030  * @param sender who sent the message
1031  * @param message the actual message
1032  * @param atsi performance data for the connection
1033  * @return GNUNET_OK to keep the connection open,
1034  *         GNUNET_SYSERR to close it (signal serious error)
1035  */
1036 static int
1037 client_handle_close_ack (void *cls,
1038                          struct GNUNET_MESH_Tunnel *tunnel,
1039                          void **tunnel_ctx,
1040                          const struct GNUNET_PeerIdentity *sender,
1041                          const struct GNUNET_MessageHeader *message,
1042                          const struct GNUNET_ATS_Information*atsi)
1043 {
1044   struct GNUNET_STREAM_Socket *socket = cls;
1045
1046   return GNUNET_OK;
1047 }
1048
1049 /*****************************/
1050 /* Server's Message Handlers */
1051 /*****************************/
1052
1053 /**
1054  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1055  *
1056  * @param cls the closure
1057  * @param tunnel connection to the other end
1058  * @param tunnel_ctx the socket
1059  * @param sender who sent the message
1060  * @param message the actual message
1061  * @param atsi performance data for the connection
1062  * @return GNUNET_OK to keep the connection open,
1063  *         GNUNET_SYSERR to close it (signal serious error)
1064  */
1065 static int
1066 server_handle_data (void *cls,
1067                     struct GNUNET_MESH_Tunnel *tunnel,
1068                     void **tunnel_ctx,
1069                     const struct GNUNET_PeerIdentity *sender,
1070                     const struct GNUNET_MessageHeader *message,
1071                     const struct GNUNET_ATS_Information*atsi)
1072 {
1073   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1074
1075   return handle_data (socket,
1076                       tunnel,
1077                       sender,
1078                       (const struct GNUNET_STREAM_DataMessage *)message,
1079                       atsi);
1080 }
1081
1082
1083 /**
1084  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1085  *
1086  * @param cls the closure
1087  * @param tunnel connection to the other end
1088  * @param tunnel_ctx the socket
1089  * @param sender who sent the message
1090  * @param message the actual message
1091  * @param atsi performance data for the connection
1092  * @return GNUNET_OK to keep the connection open,
1093  *         GNUNET_SYSERR to close it (signal serious error)
1094  */
1095 static int
1096 server_handle_hello (void *cls,
1097                      struct GNUNET_MESH_Tunnel *tunnel,
1098                      void **tunnel_ctx,
1099                      const struct GNUNET_PeerIdentity *sender,
1100                      const struct GNUNET_MessageHeader *message,
1101                      const struct GNUNET_ATS_Information*atsi)
1102 {
1103   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1104   struct GNUNET_STREAM_HelloAckMessage *reply;
1105
1106   GNUNET_assert (socket->tunnel == tunnel);
1107   if (STATE_INIT == socket->state)
1108     {
1109       /* Get the random sequence number */
1110       socket->write_sequence_number = 
1111         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1112       reply = 
1113         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1114       reply->header.header.size = 
1115         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1116       reply->header.header.type = 
1117         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1118       reply->sequence_number = htonl (socket->write_sequence_number);
1119       queue_message (socket, 
1120                      &reply->header,
1121                      &set_state_hello_wait, 
1122                      NULL);
1123     }
1124   else
1125     {
1126       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127                   "Client sent HELLO when in state %d\n", socket->state);
1128       /* FIXME: Send RESET? */
1129       
1130     }
1131   return GNUNET_OK;
1132 }
1133
1134
1135 /**
1136  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1137  *
1138  * @param cls the closure
1139  * @param tunnel connection to the other end
1140  * @param tunnel_ctx the socket
1141  * @param sender who sent the message
1142  * @param message the actual message
1143  * @param atsi performance data for the connection
1144  * @return GNUNET_OK to keep the connection open,
1145  *         GNUNET_SYSERR to close it (signal serious error)
1146  */
1147 static int
1148 server_handle_hello_ack (void *cls,
1149                          struct GNUNET_MESH_Tunnel *tunnel,
1150                          void **tunnel_ctx,
1151                          const struct GNUNET_PeerIdentity *sender,
1152                          const struct GNUNET_MessageHeader *message,
1153                          const struct GNUNET_ATS_Information*atsi)
1154 {
1155   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1156   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1157
1158   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1159   GNUNET_assert (socket->tunnel == tunnel);
1160   if (STATE_HELLO_WAIT == socket->state)
1161     {
1162       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1163       socket->receive_window_available = 
1164         ntohl (ack_message->receive_window_size);
1165       socket->state = STATE_ESTABLISHED;
1166     }
1167   else
1168     {
1169       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1171       /* FIXME: Send RESET? */
1172       
1173     }
1174   return GNUNET_OK;
1175 }
1176
1177
1178 /**
1179  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1180  *
1181  * @param cls the closure
1182  * @param tunnel connection to the other end
1183  * @param tunnel_ctx the socket
1184  * @param sender who sent the message
1185  * @param message the actual message
1186  * @param atsi performance data for the connection
1187  * @return GNUNET_OK to keep the connection open,
1188  *         GNUNET_SYSERR to close it (signal serious error)
1189  */
1190 static int
1191 server_handle_reset (void *cls,
1192                      struct GNUNET_MESH_Tunnel *tunnel,
1193                      void **tunnel_ctx,
1194                      const struct GNUNET_PeerIdentity *sender,
1195                      const struct GNUNET_MessageHeader *message,
1196                      const struct GNUNET_ATS_Information*atsi)
1197 {
1198   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1199
1200   return GNUNET_OK;
1201 }
1202
1203
1204 /**
1205  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1206  *
1207  * @param cls the closure
1208  * @param tunnel connection to the other end
1209  * @param tunnel_ctx the socket
1210  * @param sender who sent the message
1211  * @param message the actual message
1212  * @param atsi performance data for the connection
1213  * @return GNUNET_OK to keep the connection open,
1214  *         GNUNET_SYSERR to close it (signal serious error)
1215  */
1216 static int
1217 server_handle_transmit_close (void *cls,
1218                               struct GNUNET_MESH_Tunnel *tunnel,
1219                               void **tunnel_ctx,
1220                               const struct GNUNET_PeerIdentity *sender,
1221                               const struct GNUNET_MessageHeader *message,
1222                               const struct GNUNET_ATS_Information*atsi)
1223 {
1224   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1225
1226   return handle_transmit_close (socket,
1227                                 tunnel,
1228                                 sender,
1229                                 (struct GNUNET_STREAM_MessageHeader *)message,
1230                                 atsi);
1231 }
1232
1233
1234 /**
1235  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1236  *
1237  * @param cls the closure
1238  * @param tunnel connection to the other end
1239  * @param tunnel_ctx the socket
1240  * @param sender who sent the message
1241  * @param message the actual message
1242  * @param atsi performance data for the connection
1243  * @return GNUNET_OK to keep the connection open,
1244  *         GNUNET_SYSERR to close it (signal serious error)
1245  */
1246 static int
1247 server_handle_transmit_close_ack (void *cls,
1248                                   struct GNUNET_MESH_Tunnel *tunnel,
1249                                   void **tunnel_ctx,
1250                                   const struct GNUNET_PeerIdentity *sender,
1251                                   const struct GNUNET_MessageHeader *message,
1252                                   const struct GNUNET_ATS_Information*atsi)
1253 {
1254   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1255
1256   return GNUNET_OK;
1257 }
1258
1259
1260 /**
1261  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1262  *
1263  * @param cls the closure
1264  * @param tunnel connection to the other end
1265  * @param tunnel_ctx the socket
1266  * @param sender who sent the message
1267  * @param message the actual message
1268  * @param atsi performance data for the connection
1269  * @return GNUNET_OK to keep the connection open,
1270  *         GNUNET_SYSERR to close it (signal serious error)
1271  */
1272 static int
1273 server_handle_receive_close (void *cls,
1274                              struct GNUNET_MESH_Tunnel *tunnel,
1275                              void **tunnel_ctx,
1276                              const struct GNUNET_PeerIdentity *sender,
1277                              const struct GNUNET_MessageHeader *message,
1278                              const struct GNUNET_ATS_Information*atsi)
1279 {
1280   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1281
1282   return GNUNET_OK;
1283 }
1284
1285
1286 /**
1287  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1288  *
1289  * @param cls the closure
1290  * @param tunnel connection to the other end
1291  * @param tunnel_ctx the socket
1292  * @param sender who sent the message
1293  * @param message the actual message
1294  * @param atsi performance data for the connection
1295  * @return GNUNET_OK to keep the connection open,
1296  *         GNUNET_SYSERR to close it (signal serious error)
1297  */
1298 static int
1299 server_handle_receive_close_ack (void *cls,
1300                                  struct GNUNET_MESH_Tunnel *tunnel,
1301                                  void **tunnel_ctx,
1302                                  const struct GNUNET_PeerIdentity *sender,
1303                                  const struct GNUNET_MessageHeader *message,
1304                                  const struct GNUNET_ATS_Information*atsi)
1305 {
1306   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1307
1308   return GNUNET_OK;
1309 }
1310
1311
1312 /**
1313  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1314  *
1315  * @param cls the closure
1316  * @param tunnel connection to the other end
1317  * @param tunnel_ctx the socket
1318  * @param sender who sent the message
1319  * @param message the actual message
1320  * @param atsi performance data for the connection
1321  * @return GNUNET_OK to keep the connection open,
1322  *         GNUNET_SYSERR to close it (signal serious error)
1323  */
1324 static int
1325 server_handle_close (void *cls,
1326                      struct GNUNET_MESH_Tunnel *tunnel,
1327                      void **tunnel_ctx,
1328                      const struct GNUNET_PeerIdentity *sender,
1329                      const struct GNUNET_MessageHeader *message,
1330                      const struct GNUNET_ATS_Information*atsi)
1331 {
1332   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1333
1334   return GNUNET_OK;
1335 }
1336
1337
1338 /**
1339  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1340  *
1341  * @param cls the closure
1342  * @param tunnel connection to the other end
1343  * @param tunnel_ctx the socket
1344  * @param sender who sent the message
1345  * @param message the actual message
1346  * @param atsi performance data for the connection
1347  * @return GNUNET_OK to keep the connection open,
1348  *         GNUNET_SYSERR to close it (signal serious error)
1349  */
1350 static int
1351 server_handle_close_ack (void *cls,
1352                          struct GNUNET_MESH_Tunnel *tunnel,
1353                          void **tunnel_ctx,
1354                          const struct GNUNET_PeerIdentity *sender,
1355                          const struct GNUNET_MessageHeader *message,
1356                          const struct GNUNET_ATS_Information*atsi)
1357 {
1358   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1359
1360   return GNUNET_OK;
1361 }
1362
1363
1364 /**
1365  * Message Handler for mesh
1366  *
1367  * @param socket the socket through which the ack was received
1368  * @param tunnel connection to the other end
1369  * @param sender who sent the message
1370  * @param ack the acknowledgment message
1371  * @param atsi performance data for the connection
1372  * @return GNUNET_OK to keep the connection open,
1373  *         GNUNET_SYSERR to close it (signal serious error)
1374  */
1375 static int
1376 handle_ack (struct GNUNET_STREAM_Socket *socket,
1377             struct GNUNET_MESH_Tunnel *tunnel,
1378             const struct GNUNET_PeerIdentity *sender,
1379             const struct GNUNET_STREAM_AckMessage *ack,
1380             const struct GNUNET_ATS_Information*atsi)
1381 {
1382   switch (socket->state)
1383     {
1384     case (STATE_ESTABLISHED):
1385       if (NULL == socket->write_handle)
1386         {
1387           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1388                       "Received DATA ACK when write_handle is NULL\n");
1389           return GNUNET_OK;
1390         }
1391
1392       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1393       socket->write_handle->receive_window_available = 
1394         ntohl (ack->receive_window_remaining);
1395       write_data (socket);
1396       break;
1397     default:
1398       break;
1399     }
1400   return GNUNET_OK;
1401 }
1402
1403
1404 /**
1405  * Message Handler for mesh
1406  *
1407  * @param cls the 'struct GNUNET_STREAM_Socket'
1408  * @param tunnel connection to the other end
1409  * @param tunnel_ctx unused
1410  * @param sender who sent the message
1411  * @param message the actual message
1412  * @param atsi performance data for the connection
1413  * @return GNUNET_OK to keep the connection open,
1414  *         GNUNET_SYSERR to close it (signal serious error)
1415  */
1416 static int
1417 client_handle_ack (void *cls,
1418                    struct GNUNET_MESH_Tunnel *tunnel,
1419                    void **tunnel_ctx,
1420                    const struct GNUNET_PeerIdentity *sender,
1421                    const struct GNUNET_MessageHeader *message,
1422                    const struct GNUNET_ATS_Information*atsi)
1423 {
1424   struct GNUNET_STREAM_Socket *socket = cls;
1425   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1426  
1427   return handle_ack (socket, tunnel, sender, ack, atsi);
1428 }
1429
1430
1431 /**
1432  * Message Handler for mesh
1433  *
1434  * @param cls the server's listen socket
1435  * @param tunnel connection to the other end
1436  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1437  * @param sender who sent the message
1438  * @param message the actual message
1439  * @param atsi performance data for the connection
1440  * @return GNUNET_OK to keep the connection open,
1441  *         GNUNET_SYSERR to close it (signal serious error)
1442  */
1443 static int
1444 server_handle_ack (void *cls,
1445                    struct GNUNET_MESH_Tunnel *tunnel,
1446                    void **tunnel_ctx,
1447                    const struct GNUNET_PeerIdentity *sender,
1448                    const struct GNUNET_MessageHeader *message,
1449                    const struct GNUNET_ATS_Information*atsi)
1450 {
1451   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1452   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1453  
1454   return handle_ack (socket, tunnel, sender, ack, atsi);
1455 }
1456
1457
1458 /**
1459  * For client message handlers, the stream socket is in the
1460  * closure argument.
1461  */
1462 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1463   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1464   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1465    sizeof (struct GNUNET_STREAM_AckMessage) },
1466   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1467    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1468   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1469    sizeof (struct GNUNET_STREAM_MessageHeader)},
1470   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1471    sizeof (struct GNUNET_STREAM_MessageHeader)},
1472   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1473    sizeof (struct GNUNET_STREAM_MessageHeader)},
1474   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1475    sizeof (struct GNUNET_STREAM_MessageHeader)},
1476   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1477    sizeof (struct GNUNET_STREAM_MessageHeader)},
1478   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1479    sizeof (struct GNUNET_STREAM_MessageHeader)},
1480   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1481    sizeof (struct GNUNET_STREAM_MessageHeader)},
1482   {NULL, 0, 0}
1483 };
1484
1485
1486 /**
1487  * For server message handlers, the stream socket is in the
1488  * tunnel context, and the listen socket in the closure argument.
1489  */
1490 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1491   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1492   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1493    sizeof (struct GNUNET_STREAM_AckMessage) },
1494   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1495    sizeof (struct GNUNET_STREAM_MessageHeader)},
1496   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1497    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1498   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1499    sizeof (struct GNUNET_STREAM_MessageHeader)},
1500   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1501    sizeof (struct GNUNET_STREAM_MessageHeader)},
1502   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1503    sizeof (struct GNUNET_STREAM_MessageHeader)},
1504   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1505    sizeof (struct GNUNET_STREAM_MessageHeader)},
1506   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1507    sizeof (struct GNUNET_STREAM_MessageHeader)},
1508   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1509    sizeof (struct GNUNET_STREAM_MessageHeader)},
1510   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1511    sizeof (struct GNUNET_STREAM_MessageHeader)},
1512   {NULL, 0, 0}
1513 };
1514
1515
1516 /**
1517  * Function called when our target peer is connected to our tunnel
1518  *
1519  * @param cls the socket for which this tunnel is created
1520  * @param peer the peer identity of the target
1521  * @param atsi performance data for the connection
1522  */
1523 static void
1524 mesh_peer_connect_callback (void *cls,
1525                             const struct GNUNET_PeerIdentity *peer,
1526                             const struct GNUNET_ATS_Information * atsi)
1527 {
1528   struct GNUNET_STREAM_Socket *socket = cls;
1529   struct GNUNET_STREAM_MessageHeader *message;
1530
1531   if (0 != memcmp (&socket->other_peer, 
1532                    peer, 
1533                    sizeof (struct GNUNET_PeerIdentity)))
1534     {
1535       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536                   "A peer (%s) which is not our target has connected to our tunnel", 
1537                   GNUNET_i2s (peer));
1538       return;
1539     }
1540   
1541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542               "Target peer %s connected\n", GNUNET_i2s (peer));
1543   
1544   /* Set state to INIT */
1545   socket->state = STATE_INIT;
1546
1547   /* Send HELLO message */
1548   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1549   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1550   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1551   queue_message (socket,
1552                  message,
1553                  &set_state_hello_wait,
1554                  NULL);
1555
1556   /* Call open callback */
1557   if (NULL == socket->open_cb)
1558     {
1559       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1560                   "STREAM_open callback is NULL\n");
1561     }
1562   else
1563     {
1564       socket->open_cb (socket->open_cls, socket);
1565     }
1566 }
1567
1568
1569 /**
1570  * Function called when our target peer is disconnected from our tunnel
1571  *
1572  * @param cls the socket associated which this tunnel
1573  * @param peer the peer identity of the target
1574  */
1575 static void
1576 mesh_peer_disconnect_callback (void *cls,
1577                                const struct GNUNET_PeerIdentity *peer)
1578 {
1579
1580 }
1581
1582
1583 /*****************/
1584 /* API functions */
1585 /*****************/
1586
1587
1588 /**
1589  * Tries to open a stream to the target peer
1590  *
1591  * @param cfg configuration to use
1592  * @param target the target peer to which the stream has to be opened
1593  * @param app_port the application port number which uniquely identifies this
1594  *            stream
1595  * @param open_cb this function will be called after stream has be established 
1596  * @param open_cb_cls the closure for open_cb
1597  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1598  * @return if successful it returns the stream socket; NULL if stream cannot be
1599  *         opened 
1600  */
1601 struct GNUNET_STREAM_Socket *
1602 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1603                     const struct GNUNET_PeerIdentity *target,
1604                     GNUNET_MESH_ApplicationType app_port,
1605                     GNUNET_STREAM_OpenCallback open_cb,
1606                     void *open_cb_cls,
1607                     ...)
1608 {
1609   struct GNUNET_STREAM_Socket *socket;
1610   enum GNUNET_STREAM_Option option;
1611   va_list vargs;                /* Variable arguments */
1612
1613   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1614   socket->other_peer = *target;
1615   socket->open_cb = open_cb;
1616   socket->open_cls = open_cb_cls;
1617
1618   /* Set defaults */
1619   socket->retransmit_timeout = 
1620     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1621
1622   va_start (vargs, open_cb_cls); /* Parse variable args */
1623   do {
1624     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1625     switch (option)
1626       {
1627       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1628         /* Expect struct GNUNET_TIME_Relative */
1629         socket->retransmit_timeout = va_arg (vargs,
1630                                              struct GNUNET_TIME_Relative);
1631         break;
1632       case GNUNET_STREAM_OPTION_END:
1633         break;
1634       }
1635   } while (GNUNET_STREAM_OPTION_END != option);
1636   va_end (vargs);               /* End of variable args parsing */
1637
1638   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1639                                       1,  /* QUEUE size as parameter? */
1640                                       socket, /* cls */
1641                                       NULL, /* No inbound tunnel handler */
1642                                       NULL, /* No inbound tunnel cleaner */
1643                                       client_message_handlers,
1644                                       NULL); /* We don't get inbound tunnels */
1645   // FIXME: if (NULL == socket->mesh) ...
1646
1647   /* Now create the mesh tunnel to target */
1648   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1649                                               NULL, /* Tunnel context */
1650                                               &mesh_peer_connect_callback,
1651                                               &mesh_peer_disconnect_callback,
1652                                               socket);
1653   // FIXME: if (NULL == socket->tunnel) ...
1654
1655   return socket;
1656 }
1657
1658
1659 /**
1660  * Closes the stream
1661  *
1662  * @param socket the stream socket
1663  */
1664 void
1665 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1666 {
1667   struct MessageQueue *head;
1668
1669   /* Clear Transmit handles */
1670   if (NULL != socket->transmit_handle)
1671     {
1672       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1673       socket->transmit_handle = NULL;
1674     }
1675
1676   /* Clear existing message queue */
1677   while (NULL != (head = socket->queue_head)) {
1678     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1679                                  socket->queue_tail,
1680                                  head);
1681     GNUNET_free (head->message);
1682     GNUNET_free (head);
1683   }
1684
1685   /* Close associated tunnel */
1686   if (NULL != socket->tunnel)
1687     {
1688       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1689       socket->tunnel = NULL;
1690     }
1691
1692   /* Close mesh connection */
1693   if (NULL != socket->mesh)
1694     {
1695       GNUNET_MESH_disconnect (socket->mesh);
1696       socket->mesh = NULL;
1697     }
1698   
1699   /* Release receive buffer */
1700   if (NULL != socket->receive_buffer)
1701     {
1702       GNUNET_free (socket->receive_buffer);
1703     }
1704
1705   GNUNET_free (socket);
1706 }
1707
1708
1709 /**
1710  * Method called whenever a peer creates a tunnel to us
1711  *
1712  * @param cls closure
1713  * @param tunnel new handle to the tunnel
1714  * @param initiator peer that started the tunnel
1715  * @param atsi performance information for the tunnel
1716  * @return initial tunnel context for the tunnel
1717  *         (can be NULL -- that's not an error)
1718  */
1719 static void *
1720 new_tunnel_notify (void *cls,
1721                    struct GNUNET_MESH_Tunnel *tunnel,
1722                    const struct GNUNET_PeerIdentity *initiator,
1723                    const struct GNUNET_ATS_Information *atsi)
1724 {
1725   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1726   struct GNUNET_STREAM_Socket *socket;
1727
1728   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1729   socket->tunnel = tunnel;
1730   socket->session_id = 0;       /* FIXME */
1731   socket->other_peer = *initiator;
1732   socket->state = STATE_INIT;
1733
1734   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1735                                            socket,
1736                                            &socket->other_peer))
1737     {
1738       socket->state = STATE_CLOSED;
1739       /* FIXME: Send CLOSE message and then free */
1740       GNUNET_free (socket);
1741       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1742     }
1743   return socket;
1744 }
1745
1746
1747 /**
1748  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1749  * any associated state.  This function is NOT called if the client has
1750  * explicitly asked for the tunnel to be destroyed using
1751  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1752  * the tunnel.
1753  *
1754  * @param cls closure (set from GNUNET_MESH_connect)
1755  * @param tunnel connection to the other end (henceforth invalid)
1756  * @param tunnel_ctx place where local state associated
1757  *                   with the tunnel is stored
1758  */
1759 static void 
1760 tunnel_cleaner (void *cls,
1761                 const struct GNUNET_MESH_Tunnel *tunnel,
1762                 void *tunnel_ctx)
1763 {
1764   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1765   
1766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1767               "Peer %s has terminated connection abruptly\n",
1768               GNUNET_i2s (&socket->other_peer));
1769
1770   socket->status = GNUNET_STREAM_SHUTDOWN;
1771   /* Clear Transmit handles */
1772   if (NULL != socket->transmit_handle)
1773     {
1774       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1775       socket->transmit_handle = NULL;
1776     }
1777   socket->tunnel = NULL;
1778 }
1779
1780
1781 /**
1782  * Listens for stream connections for a specific application ports
1783  *
1784  * @param cfg the configuration to use
1785  * @param app_port the application port for which new streams will be accepted
1786  * @param listen_cb this function will be called when a peer tries to establish
1787  *            a stream with us
1788  * @param listen_cb_cls closure for listen_cb
1789  * @return listen socket, NULL for any error
1790  */
1791 struct GNUNET_STREAM_ListenSocket *
1792 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1793                       GNUNET_MESH_ApplicationType app_port,
1794                       GNUNET_STREAM_ListenCallback listen_cb,
1795                       void *listen_cb_cls)
1796 {
1797   /* FIXME: Add variable args for passing configration options? */
1798   struct GNUNET_STREAM_ListenSocket *lsocket;
1799   GNUNET_MESH_ApplicationType app_types[2];
1800
1801   app_types[0] = app_port;
1802   app_types[1] = 0;
1803   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1804   lsocket->port = app_port;
1805   lsocket->listen_cb = listen_cb;
1806   lsocket->listen_cb_cls = listen_cb_cls;
1807   lsocket->mesh = GNUNET_MESH_connect (cfg,
1808                                        10, /* FIXME: QUEUE size as parameter? */
1809                                        lsocket, /* Closure */
1810                                        &new_tunnel_notify,
1811                                        &tunnel_cleaner,
1812                                        server_message_handlers,
1813                                        app_types);
1814   return lsocket;
1815 }
1816
1817
1818 /**
1819  * Closes the listen socket
1820  *
1821  * @param lsocket the listen socket
1822  */
1823 void
1824 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1825 {
1826   /* Close MESH connection */
1827   GNUNET_MESH_disconnect (lsocket->mesh);
1828   
1829   GNUNET_free (lsocket);
1830 }
1831
1832
1833 /**
1834  * Tries to write the given data to the stream
1835  *
1836  * @param socket the socket representing a stream
1837  * @param data the data buffer from where the data is written into the stream
1838  * @param size the number of bytes to be written from the data buffer
1839  * @param timeout the timeout period
1840  * @param write_cont the function to call upon writing some bytes into the stream
1841  * @param write_cont_cls the closure
1842  * @return handle to cancel the operation
1843  */
1844 struct GNUNET_STREAM_IOHandle *
1845 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1846                      const void *data,
1847                      size_t size,
1848                      struct GNUNET_TIME_Relative timeout,
1849                      GNUNET_STREAM_CompletionContinuation write_cont,
1850                      void *write_cont_cls)
1851 {
1852   unsigned int num_needed_packets;
1853   unsigned int packet;
1854   struct GNUNET_STREAM_IOHandle *io_handle;
1855   size_t packet_size;
1856   struct GNUNET_STREAM_DataMessage *data_msg;
1857   const void *sweep;
1858
1859   /* Return NULL if there is already a write request pending */
1860   if (NULL != socket->write_handle)
1861   {
1862     GNUNET_break (0);
1863     return NULL;
1864   }
1865   if (!((STATE_ESTABLISHED == socket->state)
1866         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
1867         || (STATE_RECEIVE_CLOSED == socket->state)))
1868     {
1869       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1870                   "Attempting to write on a closed (OR) not-yet-established"
1871                   "stream\n"); 
1872       return NULL;
1873     } 
1874   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
1875     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
1876   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
1877   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
1878   io_handle->receive_window_available = socket->receive_window_available;
1879   sweep = data;
1880   /* Divide the given buffer into packets for sending */
1881   for (packet=0; packet < num_needed_packets; packet++)
1882     {
1883       if ((packet + 1) * max_payload_size < size) 
1884         {
1885           packet_size = MAX_PACKET_SIZE;
1886         }
1887       else 
1888         {
1889           packet_size = size - packet * max_payload_size
1890             + sizeof (struct GNUNET_STREAM_DataMessage);
1891         }
1892       io_handle->messages[packet] = GNUNET_malloc (packet_size);
1893       io_handle->messages[packet]->header.header.size = htons (packet_size);
1894       io_handle->messages[packet]->header.header.type =
1895         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
1896       io_handle->messages[packet]->sequence_number =
1897         htons (socket->write_sequence_number++);
1898
1899       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
1900          determined from RTT */
1901       io_handle->messages[packet]->ack_deadline = 
1902         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
1903                                    (GNUNET_TIME_UNIT_SECONDS, 5));
1904       data_msg = io_handle->messages[packet];
1905       /* Copy data from given buffer to the packet */
1906       memcpy (&data_msg[1],
1907               sweep,
1908               packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
1909       sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
1910     }
1911   socket->write_handle = io_handle;
1912   write_data (socket);
1913
1914   return io_handle;
1915 }
1916
1917
1918 /**
1919  * Tries to read data from the stream
1920  *
1921  * @param socket the socket representing a stream
1922  * @param timeout the timeout period
1923  * @param proc function to call with data (once only)
1924  * @param proc_cls the closure for proc
1925  * @return handle to cancel the operation
1926  */
1927 struct GNUNET_STREAM_IOHandle *
1928 GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket,
1929                     struct GNUNET_TIME_Relative timeout,
1930                     GNUNET_STREAM_DataProcessor proc,
1931                     void *proc_cls)
1932 {
1933   /* Check the bitmap for any holes */
1934
1935   /* Deem the data from the starting of the bitmap upto a hole as available
1936   data */
1937
1938   /* Create an IO handle */
1939
1940   /* Call the Data processor with this available data */
1941   
1942   /* Update the read_sequence_number to the first hole in the bitmap */
1943
1944   /* Shift the bitmap so that the first hole is now at the start */
1945 }