0f88af92b4e86f2bde3f954b2d6aa5f95954b5d6
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file stream/stream_api.c
23  * @brief Implementation of the stream library
24  * @author Sree Harsha Totakura
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_crypto_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "stream_protocol.h"
31
32
33 /**
34  * The maximum packet size of a stream packet
35  */
36 #define MAX_PACKET_SIZE 64000
37
38 /**
39  * The maximum payload a data message packet can carry
40  */
41 static size_t max_payload_size = 
42   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44 /**
45  * Receive buffer
46  */
47 #define RECEIVE_BUFFER_SIZE 4096000
48
49 /**
50  * states in the Protocol
51  */
52 enum State
53   {
54     /**
55      * Client initialization state
56      */
57     STATE_INIT,
58
59     /**
60      * Listener initialization state 
61      */
62     STATE_LISTEN,
63
64     /**
65      * Pre-connection establishment state
66      */
67     STATE_HELLO_WAIT,
68
69     /**
70      * State where a connection has been established
71      */
72     STATE_ESTABLISHED,
73
74     /**
75      * State where the socket is closed on our side and waiting to be ACK'ed
76      */
77     STATE_RECEIVE_CLOSE_WAIT,
78
79     /**
80      * State where the socket is closed for reading
81      */
82     STATE_RECEIVE_CLOSED,
83
84     /**
85      * State where the socket is closed on our side and waiting to be ACK'ed
86      */
87     STATE_TRANSMIT_CLOSE_WAIT,
88
89     /**
90      * State where the socket is closed for writing
91      */
92     STATE_TRANSMIT_CLOSED,
93
94     /**
95      * State where the socket is closed on our side and waiting to be ACK'ed
96      */
97     STATE_CLOSE_WAIT,
98
99     /**
100      * State where the socket is closed
101      */
102     STATE_CLOSED 
103   };
104
105
106 /**
107  * Functions of this type are called when a message is written
108  *
109  * @param socket the socket the written message was bound to
110  */
111 typedef void (*SendFinishCallback) (void *cls,
112                                     struct GNUNET_STREAM_Socket *socket);
113
114
115 /**
116  * The send message queue
117  */
118 struct MessageQueue
119 {
120   /**
121    * The message
122    */
123   struct GNUNET_STREAM_MessageHeader *message;
124
125   /**
126    * Callback to be called when the message is sent
127    */
128   SendFinishCallback finish_cb;
129
130   /**
131    * The closure for finish_cb
132    */
133   void *finish_cb_cls;
134
135   /**
136    * The next message in queue. Should be NULL in the last message
137    */
138   struct MessageQueue *next;
139
140   /**
141    * The next message in queue. Should be NULL in the last message
142    */
143   struct MessageQueue *prev;
144 };
145
146
147 /**
148  * The STREAM Socket Handler
149  */
150 struct GNUNET_STREAM_Socket
151 {
152
153   /**
154    * The peer identity of the peer at the other end of the stream
155    */
156   struct GNUNET_PeerIdentity other_peer;
157
158   /**
159    * Retransmission timeout
160    */
161   struct GNUNET_TIME_Relative retransmit_timeout;
162
163   /**
164    * The mesh handle
165    */
166   struct GNUNET_MESH_Handle *mesh;
167
168   /**
169    * The mesh tunnel handle
170    */
171   struct GNUNET_MESH_Tunnel *tunnel;
172
173   /**
174    * Stream open closure
175    */
176   void *open_cls;
177
178   /**
179    * Stream open callback
180    */
181   GNUNET_STREAM_OpenCallback open_cb;
182
183   /**
184    * The current transmit handle (if a pending transmit request exists)
185    */
186   struct GNUNET_MESH_TransmitHandle *transmit_handle;
187
188   /**
189    * The current message associated with the transmit handle
190    */
191   struct MessageQueue *queue_head;
192
193   /**
194    * The queue tail, should always point to the last message in queue
195    */
196   struct MessageQueue *queue_tail;
197
198   /**
199    * The write IO_handle associated with this socket
200    */
201   struct GNUNET_STREAM_IOHandle *write_handle;
202
203   /**
204    * The read IO_handle associated with this socket
205    */
206   struct GNUNET_STREAM_IOHandle *read_handle;
207
208   /**
209    * The state of the protocol associated with this socket
210    */
211   enum State state;
212
213   /**
214    * The status of the socket
215    */
216   enum GNUNET_STREAM_Status status;
217
218   /**
219    * The number of previous timeouts; FIXME: currently not used
220    */
221   unsigned int retries;
222
223   /**
224    * The session id associated with this stream connection
225    */
226   uint32_t session_id;
227
228   /**
229    * Write sequence number. Start at random upon reaching ESTABLISHED state
230    */
231   uint32_t write_sequence_number;
232
233   /**
234    * Read sequence number. This number's value is determined during handshake
235    */
236   uint32_t read_sequence_number;
237
238   /**
239    * receiver's available buffer
240    */
241   uint32_t receive_window_available;
242 };
243
244
245 /**
246  * A socket for listening
247  */
248 struct GNUNET_STREAM_ListenSocket
249 {
250
251   /**
252    * The mesh handle
253    */
254   struct GNUNET_MESH_Handle *mesh;
255
256   /**
257    * The callback function which is called after successful opening socket
258    */
259   GNUNET_STREAM_ListenCallback listen_cb;
260
261   /**
262    * The call back closure
263    */
264   void *listen_cb_cls;
265
266   /**
267    * The service port
268    */
269   GNUNET_MESH_ApplicationType port;
270 };
271
272
273 /**
274  * The IO Handle
275  */
276 struct GNUNET_STREAM_IOHandle
277 {
278   /**
279    * The packet_buffers associated with this Handle
280    */
281   struct GNUNET_STREAM_DataMessage *messages[64];
282
283   /**
284    * The bitmap of this IOHandle; Corresponding bit for a message is set when
285    * it has been acknowledged by the receiver
286    */
287   GNUNET_STREAM_AckBitmap ack_bitmap;
288
289   /**
290    * receiver's available buffer
291    */
292   uint32_t receive_window_available;
293
294   /**
295    * Number of packets sent before waiting for an ack
296    *
297    * FIXME: Do we need this?
298    */
299   unsigned int sent_packets;
300 };
301
302
303 /**
304  * Default value in seconds for various timeouts
305  */
306 static unsigned int default_timeout = 300;
307
308
309 /**
310  * Callback function for sending hello message
311  *
312  * @param cls closure the socket
313  * @param size number of bytes available in buf
314  * @param buf where the callee should write the message
315  * @return number of bytes written to buf
316  */
317 static size_t
318 send_message_notify (void *cls, size_t size, void *buf)
319 {
320   struct GNUNET_STREAM_Socket *socket = cls;
321   struct MessageQueue *head;
322   size_t ret;
323
324   socket->transmit_handle = NULL; /* Remove the transmit handle */
325   head = socket->queue_head;
326   if (NULL == head)
327     return 0; /* just to be safe */
328   if (0 == size)                /* request timed out */
329     {
330       socket->retries++;
331       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
332                   "Message sending timed out. Retry %d \n",
333                   socket->retries);
334       socket->transmit_handle = 
335         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
336                                            0, /* Corking */
337                                            1, /* Priority */
338                                            /* FIXME: exponential backoff */
339                                            socket->retransmit_timeout,
340                                            &socket->other_peer,
341                                            ntohs (head->message->header.size),
342                                            &send_message_notify,
343                                            socket);
344       return 0;
345     }
346
347   ret = ntohs (head->message->header.size);
348   GNUNET_assert (size >= ret);
349   memcpy (buf, head->message, ret);
350   if (NULL != head->finish_cb)
351     {
352       head->finish_cb (socket, head->finish_cb_cls);
353     }
354   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
355                                socket->queue_tail,
356                                head);
357   GNUNET_free (head->message);
358   GNUNET_free (head);
359   head = socket->queue_head;
360   if (NULL != head)    /* more pending messages to send */
361     {
362       socket->retries = 0;
363       socket->transmit_handle = 
364         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
365                                            0, /* Corking */
366                                            1, /* Priority */
367                                            /* FIXME: exponential backoff */
368                                            socket->retransmit_timeout,
369                                            &socket->other_peer,
370                                            ntohs (head->message->header.size),
371                                            &send_message_notify,
372                                            socket);
373     }
374   return ret;
375 }
376
377
378 /**
379  * Queues a message for sending using the mesh connection of a socket
380  *
381  * @param socket the socket whose mesh connection is used
382  * @param message the message to be sent
383  * @param finish_cb the callback to be called when the message is sent
384  * @param finish_cb_cls the closure for the callback
385  */
386 static void
387 queue_message (struct GNUNET_STREAM_Socket *socket,
388                struct GNUNET_STREAM_MessageHeader *message,
389                SendFinishCallback finish_cb,
390                void *finish_cb_cls)
391 {
392   struct MessageQueue *queue_entity;
393
394   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
395   queue_entity->message = message;
396   queue_entity->finish_cb = finish_cb;
397   queue_entity->finish_cb_cls = finish_cb_cls;
398   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
399                                     socket->queue_tail,
400                                     queue_entity);
401   if (NULL == socket->transmit_handle)
402   {
403     socket->retries = 0;
404     socket->transmit_handle = 
405       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
406                                          0, /* Corking */
407                                          1, /* Priority */
408                                          socket->retransmit_timeout,
409                                          &socket->other_peer,
410                                          ntohs (message->header.size),
411                                          &send_message_notify,
412                                          socket);
413   }
414 }
415
416
417 /**
418  * Function to modify a bit in GNUNET_STREAM_AckBitmap
419  *
420  * @param bitmap the bitmap to modify
421  * @param bit the bit number to modify
422  * @param value GNUNET_YES to on, GNUNET_NO to off
423  */
424 static void
425 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
426                       unsigned int bit, 
427                       int value)
428 {
429   GNUNET_assert (bit < 64);
430   if (GNUNET_YES == value)
431     *bitmap |= (1LL << bit);
432   else
433     *bitmap &= ~(1LL << bit);
434 }
435
436
437 /**
438  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
439  *
440  * @param bit the bit number to check
441  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
442  */
443 static uint8_t
444 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
445                       unsigned int bit)
446 {
447   GNUNET_assert (bit < 64);
448   return 0 != (*bitmap & (1LL << bit));
449 }
450
451
452
453 /**
454  * Function called when Data Message is sent
455  *
456  * @param cls the io_handle corresponding to the Data Message
457  * @param socket the socket which was used
458  */
459 static void
460 write_data_finish_cb (void *cls,
461                       struct GNUNET_STREAM_Socket *socket)
462 {
463   struct GNUNET_STREAM_IOHandle *io_handle = cls;
464
465   io_handle->sent_packets++;
466 }
467
468
469 /**
470  * Writes data using the given socket. The amount of data written is limited by
471  * the receive_window_size
472  *
473  * @param socket the socket to use
474  */
475 static void 
476 write_data (struct GNUNET_STREAM_Socket *socket)
477 {
478   struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
479   unsigned int packet;
480   int ack_packet;
481
482   ack_packet = -1;
483   /* Find the last acknowledged packet */
484   for (packet=0; packet < 64; packet++)
485     {
486       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
487                                               packet))
488         {
489           ack_packet = packet;
490         }
491     }
492   /* Resend packets which weren't ack'ed */
493   for (packet=0; packet < ack_packet; packet++)
494     {
495       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
496                                              packet))
497         {
498           queue_message (socket,
499                          &io_handle->messages[packet]->header,
500                          NULL,
501                          NULL);
502         }
503     }
504   packet = ack_packet + 1;
505   /* Now send new packets if there is enough buffer space */
506   while (io_handle->receive_window_available -=
507          io_handle->messages[packet]->header.header.size > 0)
508     {
509       queue_message (socket,
510                      &io_handle->messages[packet]->header,
511                      &write_data_finish_cb,
512                      io_handle);
513     }
514 }
515
516
517 /**
518  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
519  *
520  * @param cls the socket (set from GNUNET_MESH_connect)
521  * @param tunnel connection to the other end
522  * @param tunnel_ctx place to store local state associated with the tunnel
523  * @param sender who sent the message
524  * @param message the actual message
525  * @param atsi performance data for the connection
526  * @return GNUNET_OK to keep the connection open,
527  *         GNUNET_SYSERR to close it (signal serious error)
528  */
529 static int
530 client_handle_data (void *cls,
531              struct GNUNET_MESH_Tunnel *tunnel,
532              void **tunnel_ctx,
533              const struct GNUNET_PeerIdentity *sender,
534              const struct GNUNET_MessageHeader *message,
535              const struct GNUNET_ATS_Information*atsi)
536 {
537   struct GNUNET_STREAM_Socket *socket = cls;
538   uint16_t size;
539   const struct GNUNET_STREAM_DataMessage *data_msg;
540   const void *payload;
541
542   size = ntohs (message->size);
543   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
544   {
545     GNUNET_break_op (0);
546     return GNUNET_SYSERR;
547   }
548   data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
549   size -= sizeof (struct GNUNET_STREAM_DataMessage);
550   payload = &data_msg[1];
551   /* ... */
552   
553   return GNUNET_OK;
554 }
555
556
557 /**
558  * Callback to set state to ESTABLISHED
559  *
560  * @param cls the closure from queue_message FIXME: document
561  * @param socket the socket to requiring state change
562  */
563 static void
564 set_state_established (void *cls,
565                        struct GNUNET_STREAM_Socket *socket)
566 {
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
568   socket->state = STATE_ESTABLISHED;
569 }
570
571
572 /**
573  * Callback to set state to HELLO_WAIT
574  *
575  * @param cls the closure from queue_message
576  * @param socket the socket to requiring state change
577  */
578 static void
579 set_state_hello_wait (void *cls,
580                       struct GNUNET_STREAM_Socket *socket)
581 {
582   GNUNET_assert (STATE_INIT == socket->state);
583   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
584   socket->state = STATE_HELLO_WAIT;
585 }
586
587
588 /**
589  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
590  *
591  * @param cls the socket (set from GNUNET_MESH_connect)
592  * @param tunnel connection to the other end
593  * @param tunnel_ctx this is NULL
594  * @param sender who sent the message
595  * @param message the actual message
596  * @param atsi performance data for the connection
597  * @return GNUNET_OK to keep the connection open,
598  *         GNUNET_SYSERR to close it (signal serious error)
599  */
600 static int
601 client_handle_hello_ack (void *cls,
602                          struct GNUNET_MESH_Tunnel *tunnel,
603                          void **tunnel_ctx,
604                          const struct GNUNET_PeerIdentity *sender,
605                          const struct GNUNET_MessageHeader *message,
606                          const struct GNUNET_ATS_Information*atsi)
607 {
608   struct GNUNET_STREAM_Socket *socket = cls;
609   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
610   struct GNUNET_STREAM_HelloAckMessage *reply;
611
612   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
613   GNUNET_assert (socket->tunnel == tunnel);
614   switch (socket->state)
615   {
616   case STATE_HELLO_WAIT:
617       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
618       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
619       /* Get the random sequence number */
620       socket->write_sequence_number = 
621         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
622       reply = 
623         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
624       reply->header.header.size = 
625         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
626       reply->header.header.type = 
627         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
628       reply->sequence_number = htonl (socket->write_sequence_number);
629       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
630       queue_message (socket, 
631                      &reply->header, 
632                      &set_state_established, 
633                      NULL);      
634       return GNUNET_OK;
635   case STATE_ESTABLISHED:
636   case STATE_RECEIVE_CLOSE_WAIT:
637     // call statistics (# ACKs ignored++)
638     return GNUNET_OK;
639   case STATE_INIT:
640   default:
641     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642                 "Server sent HELLO_ACK when in state %d\n", socket->state);
643     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
644     return GNUNET_SYSERR;
645   }
646
647 }
648
649
650 /**
651  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
652  *
653  * @param cls the socket (set from GNUNET_MESH_connect)
654  * @param tunnel connection to the other end
655  * @param tunnel_ctx this is NULL
656  * @param sender who sent the message
657  * @param message the actual message
658  * @param atsi performance data for the connection
659  * @return GNUNET_OK to keep the connection open,
660  *         GNUNET_SYSERR to close it (signal serious error)
661  */
662 static int
663 client_handle_reset (void *cls,
664                      struct GNUNET_MESH_Tunnel *tunnel,
665                      void **tunnel_ctx,
666                      const struct GNUNET_PeerIdentity *sender,
667                      const struct GNUNET_MessageHeader *message,
668                      const struct GNUNET_ATS_Information*atsi)
669 {
670   struct GNUNET_STREAM_Socket *socket = cls;
671
672   return GNUNET_OK;
673 }
674
675
676 /**
677  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
678  *
679  * @param cls the socket (set from GNUNET_MESH_connect)
680  * @param tunnel connection to the other end
681  * @param tunnel_ctx this is NULL
682  * @param sender who sent the message
683  * @param message the actual message
684  * @param atsi performance data for the connection
685  * @return GNUNET_OK to keep the connection open,
686  *         GNUNET_SYSERR to close it (signal serious error)
687  */
688 static int
689 client_handle_transmit_close (void *cls,
690                               struct GNUNET_MESH_Tunnel *tunnel,
691                               void **tunnel_ctx,
692                               const struct GNUNET_PeerIdentity *sender,
693                               const struct GNUNET_MessageHeader *message,
694                               const struct GNUNET_ATS_Information*atsi)
695 {
696   struct GNUNET_STREAM_Socket *socket = cls;
697
698   return GNUNET_OK;
699 }
700
701
702 /**
703  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
704  *
705  * @param cls the socket (set from GNUNET_MESH_connect)
706  * @param tunnel connection to the other end
707  * @param tunnel_ctx this is NULL
708  * @param sender who sent the message
709  * @param message the actual message
710  * @param atsi performance data for the connection
711  * @return GNUNET_OK to keep the connection open,
712  *         GNUNET_SYSERR to close it (signal serious error)
713  */
714 static int
715 client_handle_transmit_close_ack (void *cls,
716                                   struct GNUNET_MESH_Tunnel *tunnel,
717                                   void **tunnel_ctx,
718                                   const struct GNUNET_PeerIdentity *sender,
719                                   const struct GNUNET_MessageHeader *message,
720                                   const struct GNUNET_ATS_Information*atsi)
721 {
722   struct GNUNET_STREAM_Socket *socket = cls;
723
724   return GNUNET_OK;
725 }
726
727
728 /**
729  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
730  *
731  * @param cls the socket (set from GNUNET_MESH_connect)
732  * @param tunnel connection to the other end
733  * @param tunnel_ctx this is NULL
734  * @param sender who sent the message
735  * @param message the actual message
736  * @param atsi performance data for the connection
737  * @return GNUNET_OK to keep the connection open,
738  *         GNUNET_SYSERR to close it (signal serious error)
739  */
740 static int
741 client_handle_receive_close (void *cls,
742                              struct GNUNET_MESH_Tunnel *tunnel,
743                              void **tunnel_ctx,
744                              const struct GNUNET_PeerIdentity *sender,
745                              const struct GNUNET_MessageHeader *message,
746                              const struct GNUNET_ATS_Information*atsi)
747 {
748   struct GNUNET_STREAM_Socket *socket = cls;
749
750   return GNUNET_OK;
751 }
752
753
754 /**
755  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
756  *
757  * @param cls the socket (set from GNUNET_MESH_connect)
758  * @param tunnel connection to the other end
759  * @param tunnel_ctx this is NULL
760  * @param sender who sent the message
761  * @param message the actual message
762  * @param atsi performance data for the connection
763  * @return GNUNET_OK to keep the connection open,
764  *         GNUNET_SYSERR to close it (signal serious error)
765  */
766 static int
767 client_handle_receive_close_ack (void *cls,
768                                  struct GNUNET_MESH_Tunnel *tunnel,
769                                  void **tunnel_ctx,
770                                  const struct GNUNET_PeerIdentity *sender,
771                                  const struct GNUNET_MessageHeader *message,
772                                  const struct GNUNET_ATS_Information*atsi)
773 {
774   struct GNUNET_STREAM_Socket *socket = cls;
775
776   return GNUNET_OK;
777 }
778
779
780 /**
781  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
782  *
783  * @param cls the socket (set from GNUNET_MESH_connect)
784  * @param tunnel connection to the other end
785  * @param tunnel_ctx this is NULL
786  * @param sender who sent the message
787  * @param message the actual message
788  * @param atsi performance data for the connection
789  * @return GNUNET_OK to keep the connection open,
790  *         GNUNET_SYSERR to close it (signal serious error)
791  */
792 static int
793 client_handle_close (void *cls,
794                      struct GNUNET_MESH_Tunnel *tunnel,
795                      void **tunnel_ctx,
796                      const struct GNUNET_PeerIdentity *sender,
797                      const struct GNUNET_MessageHeader *message,
798                      const struct GNUNET_ATS_Information*atsi)
799 {
800   struct GNUNET_STREAM_Socket *socket = cls;
801
802   return GNUNET_OK;
803 }
804
805
806 /**
807  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
808  *
809  * @param cls the socket (set from GNUNET_MESH_connect)
810  * @param tunnel connection to the other end
811  * @param tunnel_ctx this is NULL
812  * @param sender who sent the message
813  * @param message the actual message
814  * @param atsi performance data for the connection
815  * @return GNUNET_OK to keep the connection open,
816  *         GNUNET_SYSERR to close it (signal serious error)
817  */
818 static int
819 client_handle_close_ack (void *cls,
820                          struct GNUNET_MESH_Tunnel *tunnel,
821                          void **tunnel_ctx,
822                          const struct GNUNET_PeerIdentity *sender,
823                          const struct GNUNET_MessageHeader *message,
824                          const struct GNUNET_ATS_Information*atsi)
825 {
826   struct GNUNET_STREAM_Socket *socket = cls;
827
828   return GNUNET_OK;
829 }
830
831 /*****************************/
832 /* Server's Message Handlers */
833 /*****************************/
834
835 /**
836  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
837  *
838  * @param cls the closure
839  * @param tunnel connection to the other end
840  * @param tunnel_ctx the socket
841  * @param sender who sent the message
842  * @param message the actual message
843  * @param atsi performance data for the connection
844  * @return GNUNET_OK to keep the connection open,
845  *         GNUNET_SYSERR to close it (signal serious error)
846  */
847 static int
848 server_handle_data (void *cls,
849                     struct GNUNET_MESH_Tunnel *tunnel,
850                     void **tunnel_ctx,
851                     const struct GNUNET_PeerIdentity *sender,
852                     const struct GNUNET_MessageHeader *message,
853                     const struct GNUNET_ATS_Information*atsi)
854 {
855   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
856
857   return GNUNET_OK;
858 }
859
860
861 /**
862  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
863  *
864  * @param cls the closure
865  * @param tunnel connection to the other end
866  * @param tunnel_ctx the socket
867  * @param sender who sent the message
868  * @param message the actual message
869  * @param atsi performance data for the connection
870  * @return GNUNET_OK to keep the connection open,
871  *         GNUNET_SYSERR to close it (signal serious error)
872  */
873 static int
874 server_handle_hello (void *cls,
875                      struct GNUNET_MESH_Tunnel *tunnel,
876                      void **tunnel_ctx,
877                      const struct GNUNET_PeerIdentity *sender,
878                      const struct GNUNET_MessageHeader *message,
879                      const struct GNUNET_ATS_Information*atsi)
880 {
881   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
882   struct GNUNET_STREAM_HelloAckMessage *reply;
883
884   GNUNET_assert (socket->tunnel == tunnel);
885   if (STATE_INIT == socket->state)
886     {
887       /* Get the random sequence number */
888       socket->write_sequence_number = 
889         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
890       reply = 
891         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
892       reply->header.header.size = 
893         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
894       reply->header.header.type = 
895         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
896       reply->sequence_number = htonl (socket->write_sequence_number);
897       queue_message (socket, 
898                      &reply->header,
899                      &set_state_hello_wait, 
900                      NULL);
901     }
902   else
903     {
904       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905                   "Client sent HELLO when in state %d\n", socket->state);
906       /* FIXME: Send RESET? */
907       
908     }
909   return GNUNET_OK;
910 }
911
912
913 /**
914  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
915  *
916  * @param cls the closure
917  * @param tunnel connection to the other end
918  * @param tunnel_ctx the socket
919  * @param sender who sent the message
920  * @param message the actual message
921  * @param atsi performance data for the connection
922  * @return GNUNET_OK to keep the connection open,
923  *         GNUNET_SYSERR to close it (signal serious error)
924  */
925 static int
926 server_handle_hello_ack (void *cls,
927                          struct GNUNET_MESH_Tunnel *tunnel,
928                          void **tunnel_ctx,
929                          const struct GNUNET_PeerIdentity *sender,
930                          const struct GNUNET_MessageHeader *message,
931                          const struct GNUNET_ATS_Information*atsi)
932 {
933   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
934   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
935
936   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
937   GNUNET_assert (socket->tunnel == tunnel);
938   if (STATE_HELLO_WAIT == socket->state)
939     {
940       socket->read_sequence_number = ntohl (ack_message->sequence_number);
941       socket->receive_window_available = 
942         ntohl (ack_message->receive_window_size);
943       socket->state = STATE_ESTABLISHED;
944     }
945   else
946     {
947       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948                   "Client sent HELLO_ACK when in state %d\n", socket->state);
949       /* FIXME: Send RESET? */
950       
951     }
952   return GNUNET_OK;
953 }
954
955
956 /**
957  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
958  *
959  * @param cls the closure
960  * @param tunnel connection to the other end
961  * @param tunnel_ctx the socket
962  * @param sender who sent the message
963  * @param message the actual message
964  * @param atsi performance data for the connection
965  * @return GNUNET_OK to keep the connection open,
966  *         GNUNET_SYSERR to close it (signal serious error)
967  */
968 static int
969 server_handle_reset (void *cls,
970                      struct GNUNET_MESH_Tunnel *tunnel,
971                      void **tunnel_ctx,
972                      const struct GNUNET_PeerIdentity *sender,
973                      const struct GNUNET_MessageHeader *message,
974                      const struct GNUNET_ATS_Information*atsi)
975 {
976   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
977
978   return GNUNET_OK;
979 }
980
981
982 /**
983  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
984  *
985  * @param cls the closure
986  * @param tunnel connection to the other end
987  * @param tunnel_ctx the socket
988  * @param sender who sent the message
989  * @param message the actual message
990  * @param atsi performance data for the connection
991  * @return GNUNET_OK to keep the connection open,
992  *         GNUNET_SYSERR to close it (signal serious error)
993  */
994 static int
995 server_handle_transmit_close (void *cls,
996                               struct GNUNET_MESH_Tunnel *tunnel,
997                               void **tunnel_ctx,
998                               const struct GNUNET_PeerIdentity *sender,
999                               const struct GNUNET_MessageHeader *message,
1000                               const struct GNUNET_ATS_Information*atsi)
1001 {
1002   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1003
1004   return GNUNET_OK;
1005 }
1006
1007
1008 /**
1009  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1010  *
1011  * @param cls the closure
1012  * @param tunnel connection to the other end
1013  * @param tunnel_ctx the socket
1014  * @param sender who sent the message
1015  * @param message the actual message
1016  * @param atsi performance data for the connection
1017  * @return GNUNET_OK to keep the connection open,
1018  *         GNUNET_SYSERR to close it (signal serious error)
1019  */
1020 static int
1021 server_handle_transmit_close_ack (void *cls,
1022                                   struct GNUNET_MESH_Tunnel *tunnel,
1023                                   void **tunnel_ctx,
1024                                   const struct GNUNET_PeerIdentity *sender,
1025                                   const struct GNUNET_MessageHeader *message,
1026                                   const struct GNUNET_ATS_Information*atsi)
1027 {
1028   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1029
1030   return GNUNET_OK;
1031 }
1032
1033
1034 /**
1035  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1036  *
1037  * @param cls the closure
1038  * @param tunnel connection to the other end
1039  * @param tunnel_ctx the socket
1040  * @param sender who sent the message
1041  * @param message the actual message
1042  * @param atsi performance data for the connection
1043  * @return GNUNET_OK to keep the connection open,
1044  *         GNUNET_SYSERR to close it (signal serious error)
1045  */
1046 static int
1047 server_handle_receive_close (void *cls,
1048                              struct GNUNET_MESH_Tunnel *tunnel,
1049                              void **tunnel_ctx,
1050                              const struct GNUNET_PeerIdentity *sender,
1051                              const struct GNUNET_MessageHeader *message,
1052                              const struct GNUNET_ATS_Information*atsi)
1053 {
1054   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1055
1056   return GNUNET_OK;
1057 }
1058
1059
1060 /**
1061  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1062  *
1063  * @param cls the closure
1064  * @param tunnel connection to the other end
1065  * @param tunnel_ctx the socket
1066  * @param sender who sent the message
1067  * @param message the actual message
1068  * @param atsi performance data for the connection
1069  * @return GNUNET_OK to keep the connection open,
1070  *         GNUNET_SYSERR to close it (signal serious error)
1071  */
1072 static int
1073 server_handle_receive_close_ack (void *cls,
1074                                  struct GNUNET_MESH_Tunnel *tunnel,
1075                                  void **tunnel_ctx,
1076                                  const struct GNUNET_PeerIdentity *sender,
1077                                  const struct GNUNET_MessageHeader *message,
1078                                  const struct GNUNET_ATS_Information*atsi)
1079 {
1080   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1081
1082   return GNUNET_OK;
1083 }
1084
1085
1086 /**
1087  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1088  *
1089  * @param cls the closure
1090  * @param tunnel connection to the other end
1091  * @param tunnel_ctx the socket
1092  * @param sender who sent the message
1093  * @param message the actual message
1094  * @param atsi performance data for the connection
1095  * @return GNUNET_OK to keep the connection open,
1096  *         GNUNET_SYSERR to close it (signal serious error)
1097  */
1098 static int
1099 server_handle_close (void *cls,
1100                      struct GNUNET_MESH_Tunnel *tunnel,
1101                      void **tunnel_ctx,
1102                      const struct GNUNET_PeerIdentity *sender,
1103                      const struct GNUNET_MessageHeader *message,
1104                      const struct GNUNET_ATS_Information*atsi)
1105 {
1106   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1107
1108   return GNUNET_OK;
1109 }
1110
1111
1112 /**
1113  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1114  *
1115  * @param cls the closure
1116  * @param tunnel connection to the other end
1117  * @param tunnel_ctx the socket
1118  * @param sender who sent the message
1119  * @param message the actual message
1120  * @param atsi performance data for the connection
1121  * @return GNUNET_OK to keep the connection open,
1122  *         GNUNET_SYSERR to close it (signal serious error)
1123  */
1124 static int
1125 server_handle_close_ack (void *cls,
1126                          struct GNUNET_MESH_Tunnel *tunnel,
1127                          void **tunnel_ctx,
1128                          const struct GNUNET_PeerIdentity *sender,
1129                          const struct GNUNET_MessageHeader *message,
1130                          const struct GNUNET_ATS_Information*atsi)
1131 {
1132   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1133
1134   return GNUNET_OK;
1135 }
1136
1137
1138 /**
1139  * Message Handler for mesh
1140  *
1141  * @param socket the socket through which the ack was received
1142  * @param tunnel connection to the other end
1143  * @param sender who sent the message
1144  * @param ack the acknowledgment message
1145  * @param atsi performance data for the connection
1146  * @return GNUNET_OK to keep the connection open,
1147  *         GNUNET_SYSERR to close it (signal serious error)
1148  */
1149 static int
1150 handle_ack (struct GNUNET_STREAM_Socket *socket,
1151             struct GNUNET_MESH_Tunnel *tunnel,
1152             const struct GNUNET_PeerIdentity *sender,
1153             const struct GNUNET_STREAM_AckMessage *ack,
1154             const struct GNUNET_ATS_Information*atsi)
1155 {
1156   switch (socket->state)
1157     {
1158     case (STATE_ESTABLISHED):
1159       if (NULL == socket->write_handle)
1160         {
1161           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162                       "Received DATA ACK when write_handle is NULL\n");
1163           return GNUNET_OK;
1164         }
1165
1166       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1167       socket->write_handle->receive_window_available = 
1168         ntohl (ack->receive_window_remaining);
1169       write_data (socket);
1170       break;
1171     default:
1172       break;
1173     }
1174   return GNUNET_OK;
1175 }
1176
1177
1178 /**
1179  * Message Handler for mesh
1180  *
1181  * @param cls the 'struct GNUNET_STREAM_Socket'
1182  * @param tunnel connection to the other end
1183  * @param tunnel_ctx unused
1184  * @param sender who sent the message
1185  * @param message the actual message
1186  * @param atsi performance data for the connection
1187  * @return GNUNET_OK to keep the connection open,
1188  *         GNUNET_SYSERR to close it (signal serious error)
1189  */
1190 static int
1191 client_handle_ack (void *cls,
1192                    struct GNUNET_MESH_Tunnel *tunnel,
1193                    void **tunnel_ctx,
1194                    const struct GNUNET_PeerIdentity *sender,
1195                    const struct GNUNET_MessageHeader *message,
1196                    const struct GNUNET_ATS_Information*atsi)
1197 {
1198   struct GNUNET_STREAM_Socket *socket = cls;
1199   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1200  
1201   return handle_ack (socket, tunnel, sender, ack, atsi);
1202 }
1203
1204
1205 /**
1206  * Message Handler for mesh
1207  *
1208  * @param cls the server's listen socket
1209  * @param tunnel connection to the other end
1210  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1211  * @param sender who sent the message
1212  * @param message the actual message
1213  * @param atsi performance data for the connection
1214  * @return GNUNET_OK to keep the connection open,
1215  *         GNUNET_SYSERR to close it (signal serious error)
1216  */
1217 static int
1218 server_handle_ack (void *cls,
1219                    struct GNUNET_MESH_Tunnel *tunnel,
1220                    void **tunnel_ctx,
1221                    const struct GNUNET_PeerIdentity *sender,
1222                    const struct GNUNET_MessageHeader *message,
1223                    const struct GNUNET_ATS_Information*atsi)
1224 {
1225   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1226   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1227  
1228   return handle_ack (socket, tunnel, sender, ack, atsi);
1229 }
1230
1231
1232 /**
1233  * For client message handlers, the stream socket is in the
1234  * closure argument.
1235  */
1236 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1237   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1238   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1239    sizeof (struct GNUNET_STREAM_AckMessage) },
1240   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1241    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1242   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1243    sizeof (struct GNUNET_STREAM_MessageHeader)},
1244   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1245    sizeof (struct GNUNET_STREAM_MessageHeader)},
1246   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1247    sizeof (struct GNUNET_STREAM_MessageHeader)},
1248   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1249    sizeof (struct GNUNET_STREAM_MessageHeader)},
1250   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1251    sizeof (struct GNUNET_STREAM_MessageHeader)},
1252   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1253    sizeof (struct GNUNET_STREAM_MessageHeader)},
1254   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1255    sizeof (struct GNUNET_STREAM_MessageHeader)},
1256   {NULL, 0, 0}
1257 };
1258
1259
1260 /**
1261  * For server message handlers, the stream socket is in the
1262  * tunnel context, and the listen socket in the closure argument.
1263  */
1264 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1265   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1266   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1267    sizeof (struct GNUNET_STREAM_AckMessage) },
1268   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1269    sizeof (struct GNUNET_STREAM_MessageHeader)},
1270   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1271    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1272   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1273    sizeof (struct GNUNET_STREAM_MessageHeader)},
1274   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1275    sizeof (struct GNUNET_STREAM_MessageHeader)},
1276   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1277    sizeof (struct GNUNET_STREAM_MessageHeader)},
1278   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1279    sizeof (struct GNUNET_STREAM_MessageHeader)},
1280   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1281    sizeof (struct GNUNET_STREAM_MessageHeader)},
1282   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1283    sizeof (struct GNUNET_STREAM_MessageHeader)},
1284   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1285    sizeof (struct GNUNET_STREAM_MessageHeader)},
1286   {NULL, 0, 0}
1287 };
1288
1289
1290 /**
1291  * Function called when our target peer is connected to our tunnel
1292  *
1293  * @param peer the peer identity of the target
1294  * @param atsi performance data for the connection
1295  */
1296 static void
1297 mesh_peer_connect_callback (void *cls,
1298                             const struct GNUNET_PeerIdentity *peer,
1299                             const struct GNUNET_ATS_Information * atsi)
1300 {
1301   struct GNUNET_STREAM_Socket *socket = cls;
1302   struct GNUNET_STREAM_MessageHeader *message;
1303
1304   if (0 != memcmp (&socket->other_peer, 
1305                    peer, 
1306                    sizeof (struct GNUNET_PeerIdentity)))
1307     {
1308       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309                   "A peer (%s) which is not our target has connected to our tunnel", 
1310                   GNUNET_i2s (peer));
1311       return;
1312     }
1313   
1314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1315               "Target peer %s connected\n", GNUNET_i2s (peer));
1316   
1317   /* Set state to INIT */
1318   socket->state = STATE_INIT;
1319
1320   /* Send HELLO message */
1321   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1322   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1323   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1324   queue_message (socket,
1325                  message,
1326                  &set_state_hello_wait,
1327                  NULL);
1328
1329   /* Call open callback */
1330   if (NULL == socket->open_cb)
1331     {
1332       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1333                   "STREAM_open callback is NULL\n");
1334     }
1335   else
1336     {
1337       socket->open_cb (socket->open_cls, socket);
1338     }
1339 }
1340
1341
1342 /**
1343  * Function called when our target peer is disconnected from our tunnel
1344  *
1345  * @param peer the peer identity of the target
1346  */
1347 static void
1348 mesh_peer_disconnect_callback (void *cls,
1349                                const struct GNUNET_PeerIdentity *peer)
1350 {
1351
1352 }
1353
1354
1355 /*****************/
1356 /* API functions */
1357 /*****************/
1358
1359
1360 /**
1361  * Tries to open a stream to the target peer
1362  *
1363  * @param cfg configuration to use
1364  * @param target the target peer to which the stream has to be opened
1365  * @param app_port the application port number which uniquely identifies this
1366  *            stream
1367  * @param open_cb this function will be called after stream has be established 
1368  * @param open_cb_cls the closure for open_cb
1369  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1370  * @return if successful it returns the stream socket; NULL if stream cannot be
1371  *         opened 
1372  */
1373 struct GNUNET_STREAM_Socket *
1374 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1375                     const struct GNUNET_PeerIdentity *target,
1376                     GNUNET_MESH_ApplicationType app_port,
1377                     GNUNET_STREAM_OpenCallback open_cb,
1378                     void *open_cb_cls,
1379                     ...)
1380 {
1381   struct GNUNET_STREAM_Socket *socket;
1382   enum GNUNET_STREAM_Option option;
1383   va_list vargs;                /* Variable arguments */
1384
1385   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1386   socket->other_peer = *target;
1387   socket->open_cb = open_cb;
1388   socket->open_cls = open_cb_cls;
1389
1390   /* Set defaults */
1391   socket->retransmit_timeout = 
1392     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1393
1394   va_start (vargs, open_cb_cls); /* Parse variable args */
1395   do {
1396     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1397     switch (option)
1398       {
1399       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1400         /* Expect struct GNUNET_TIME_Relative */
1401         socket->retransmit_timeout = va_arg (vargs,
1402                                              struct GNUNET_TIME_Relative);
1403         break;
1404       case GNUNET_STREAM_OPTION_END:
1405         break;
1406       }
1407   } while (GNUNET_STREAM_OPTION_END != option);
1408   va_end (vargs);               /* End of variable args parsing */
1409
1410   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1411                                       1,  /* QUEUE size as parameter? */
1412                                       socket, /* cls */
1413                                       NULL, /* No inbound tunnel handler */
1414                                       NULL, /* No inbound tunnel cleaner */
1415                                       client_message_handlers,
1416                                       NULL); /* We don't get inbound tunnels */
1417   // FIXME: if (NULL == socket->mesh) ...
1418
1419   /* Now create the mesh tunnel to target */
1420   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1421                                               NULL, /* Tunnel context */
1422                                               &mesh_peer_connect_callback,
1423                                               &mesh_peer_disconnect_callback,
1424                                               socket);
1425   // FIXME: if (NULL == socket->tunnel) ...
1426
1427   return socket;
1428 }
1429
1430
1431 /**
1432  * Closes the stream
1433  *
1434  * @param socket the stream socket
1435  */
1436 void
1437 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
1438 {
1439   struct MessageQueue *head;
1440
1441   /* Clear Transmit handles */
1442   if (NULL != socket->transmit_handle)
1443     {
1444       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1445       socket->transmit_handle = NULL;
1446     }
1447
1448   /* Clear existing message queue */
1449   while (NULL != (head = socket->queue_head)) {
1450     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
1451                                  socket->queue_tail,
1452                                  head);
1453     GNUNET_free (head->message);
1454     GNUNET_free (head);
1455   }
1456
1457   /* Close associated tunnel */
1458   if (NULL != socket->tunnel)
1459     {
1460       GNUNET_MESH_tunnel_destroy (socket->tunnel);
1461       socket->tunnel = NULL;
1462     }
1463
1464   /* Close mesh connection */
1465   if (NULL != socket->mesh)
1466     {
1467       GNUNET_MESH_disconnect (socket->mesh);
1468       socket->mesh = NULL;
1469     }
1470   GNUNET_free (socket);
1471 }
1472
1473
1474 /**
1475  * Method called whenever a peer creates a tunnel to us
1476  *
1477  * @param cls closure
1478  * @param tunnel new handle to the tunnel
1479  * @param initiator peer that started the tunnel
1480  * @param atsi performance information for the tunnel
1481  * @return initial tunnel context for the tunnel
1482  *         (can be NULL -- that's not an error)
1483  */
1484 static void *
1485 new_tunnel_notify (void *cls,
1486                    struct GNUNET_MESH_Tunnel *tunnel,
1487                    const struct GNUNET_PeerIdentity *initiator,
1488                    const struct GNUNET_ATS_Information *atsi)
1489 {
1490   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1491   struct GNUNET_STREAM_Socket *socket;
1492
1493   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1494   socket->tunnel = tunnel;
1495   socket->session_id = 0;       /* FIXME */
1496   socket->other_peer = *initiator;
1497   socket->state = STATE_INIT;
1498
1499   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1500                                            socket,
1501                                            &socket->other_peer))
1502     {
1503       socket->state = STATE_CLOSED;
1504       /* FIXME: Send CLOSE message and then free */
1505       GNUNET_free (socket);
1506       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1507     }
1508   return socket;
1509 }
1510
1511
1512 /**
1513  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1514  * any associated state.  This function is NOT called if the client has
1515  * explicitly asked for the tunnel to be destroyed using
1516  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1517  * the tunnel.
1518  *
1519  * @param cls closure (set from GNUNET_MESH_connect)
1520  * @param tunnel connection to the other end (henceforth invalid)
1521  * @param tunnel_ctx place where local state associated
1522  *                   with the tunnel is stored
1523  */
1524 static void 
1525 tunnel_cleaner (void *cls,
1526                 const struct GNUNET_MESH_Tunnel *tunnel,
1527                 void *tunnel_ctx)
1528 {
1529   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1530   
1531   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1532               "Peer %s has terminated connection abruptly\n",
1533               GNUNET_i2s (&socket->other_peer));
1534
1535   socket->status = GNUNET_STREAM_SHUTDOWN;
1536   /* Clear Transmit handles */
1537   if (NULL != socket->transmit_handle)
1538     {
1539       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1540       socket->transmit_handle = NULL;
1541     }
1542   socket->tunnel = NULL;
1543 }
1544
1545
1546 /**
1547  * Listens for stream connections for a specific application ports
1548  *
1549  * @param cfg the configuration to use
1550  * @param app_port the application port for which new streams will be accepted
1551  * @param listen_cb this function will be called when a peer tries to establish
1552  *            a stream with us
1553  * @param listen_cb_cls closure for listen_cb
1554  * @return listen socket, NULL for any error
1555  */
1556 struct GNUNET_STREAM_ListenSocket *
1557 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
1558                       GNUNET_MESH_ApplicationType app_port,
1559                       GNUNET_STREAM_ListenCallback listen_cb,
1560                       void *listen_cb_cls)
1561 {
1562   /* FIXME: Add variable args for passing configration options? */
1563   struct GNUNET_STREAM_ListenSocket *lsocket;
1564   GNUNET_MESH_ApplicationType app_types[2];
1565
1566   app_types[0] = app_port;
1567   app_types[1] = 0;
1568   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
1569   lsocket->port = app_port;
1570   lsocket->listen_cb = listen_cb;
1571   lsocket->listen_cb_cls = listen_cb_cls;
1572   lsocket->mesh = GNUNET_MESH_connect (cfg,
1573                                        10, /* FIXME: QUEUE size as parameter? */
1574                                        lsocket, /* Closure */
1575                                        &new_tunnel_notify,
1576                                        &tunnel_cleaner,
1577                                        server_message_handlers,
1578                                        app_types);
1579   return lsocket;
1580 }
1581
1582
1583 /**
1584  * Closes the listen socket
1585  *
1586  * @param socket the listen socket
1587  */
1588 void
1589 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1590 {
1591   /* Close MESH connection */
1592   GNUNET_MESH_disconnect (lsocket->mesh);
1593   
1594   GNUNET_free (lsocket);
1595 }
1596
1597
1598 /**
1599  * Tries to write the given data to the stream
1600  *
1601  * @param socket the socket representing a stream
1602  * @param data the data buffer from where the data is written into the stream
1603  * @param size the number of bytes to be written from the data buffer
1604  * @param timeout the timeout period
1605  * @param write_cont the function to call upon writing some bytes into the stream
1606  * @param write_cont_cls the closure
1607  * @return handle to cancel the operation
1608  */
1609 struct GNUNET_STREAM_IOHandle *
1610 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1611                      const void *data,
1612                      size_t size,
1613                      struct GNUNET_TIME_Relative timeout,
1614                      GNUNET_STREAM_CompletionContinuation write_cont,
1615                      void *write_cont_cls)
1616 {
1617   unsigned int num_needed_packets;
1618   unsigned int packet;
1619   struct GNUNET_STREAM_IOHandle *io_handle;
1620   struct GNUNET_STREAM_DataMessage *data_msg;
1621   size_t packet_size;
1622   const void *sweep;
1623
1624   /* There is already a write request pending */
1625   if (NULL != socket->write_handle) return NULL;
1626   if (!(STATE_ESTABLISHED == socket->state 
1627         || STATE_RECEIVE_CLOSE_WAIT == socket->state
1628         || STATE_RECEIVE_CLOSED == socket->state))
1629     {
1630       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1631                   "Attempting to write on a closed/not-yet-established stream\n");
1632       return NULL;
1633     }
1634       
1635   num_needed_packets = ceil (size / max_payload_size);
1636   if (64 < num_needed_packets) 
1637     {
1638       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1639                   "Given buffer cannot be accommodated in 64 packets\n");
1640       num_needed_packets = 64;
1641     }
1642
1643   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
1644   io_handle->receive_window_available = socket->receive_window_available;
1645   sweep = data;
1646   /* Divide the given buffer into packets for sending */
1647   for (packet=0; packet < num_needed_packets; packet++)
1648     {
1649       if ((packet + 1) * max_payload_size < size) 
1650         {
1651           packet_size = MAX_PACKET_SIZE;
1652         }
1653       else 
1654         {
1655           packet_size = size - packet * max_payload_size
1656             + sizeof (struct GNUNET_STREAM_DataMessage);
1657         }
1658       io_handle->messages[packet] = GNUNET_malloc (packet_size);
1659       io_handle->messages[packet]->header.header.size = htons (packet_size);
1660       io_handle->messages[packet]->header.header.type =
1661         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
1662       io_handle->messages[packet]->sequence_number =
1663         htons (socket->write_sequence_number++);
1664       data_msg = io_handle->messages[packet];
1665       memcpy (&data_msg[1],
1666               sweep,
1667               packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
1668       sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
1669     }
1670
1671   write_data (socket);
1672
1673   return io_handle;
1674 }