71c4e61c12d2408e18b0dd7afd9ae8c8001f125d
[oweals/gnunet.git] / src / stream / stream_api.c
1 /*
2   This file is part of GNUnet.
3   (C) 2012 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /* TODO:
22  * Copy MESH handle from lsocket to socket
23  *
24  * Checks for matching the sender and socket->other_peer in server
25  * message handlers  */
26
27 /**
28  * @file stream/stream_api.c
29  * @brief Implementation of the stream library
30  * @author Sree Harsha Totakura
31  */
32 #include "platform.h"
33 #include "gnunet_common.h"
34 #include "gnunet_crypto_lib.h"
35 #include "gnunet_stream_lib.h"
36 #include "stream_protocol.h"
37
38
39 /**
40  * The maximum packet size of a stream packet
41  */
42 #define MAX_PACKET_SIZE 64000
43
44 /**
45  * The maximum payload a data message packet can carry
46  */
47 static size_t max_payload_size = 
48   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
49
50 /**
51  * Receive buffer
52  */
53 #define RECEIVE_BUFFER_SIZE 4096000
54
55 /**
56  * states in the Protocol
57  */
58 enum State
59   {
60     /**
61      * Client initialization state
62      */
63     STATE_INIT,
64
65     /**
66      * Listener initialization state 
67      */
68     STATE_LISTEN,
69
70     /**
71      * Pre-connection establishment state
72      */
73     STATE_HELLO_WAIT,
74
75     /**
76      * State where a connection has been established
77      */
78     STATE_ESTABLISHED,
79
80     /**
81      * State where the socket is closed on our side and waiting to be ACK'ed
82      */
83     STATE_RECEIVE_CLOSE_WAIT,
84
85     /**
86      * State where the socket is closed for reading
87      */
88     STATE_RECEIVE_CLOSED,
89
90     /**
91      * State where the socket is closed on our side and waiting to be ACK'ed
92      */
93     STATE_TRANSMIT_CLOSE_WAIT,
94
95     /**
96      * State where the socket is closed for writing
97      */
98     STATE_TRANSMIT_CLOSED,
99
100     /**
101      * State where the socket is closed on our side and waiting to be ACK'ed
102      */
103     STATE_CLOSE_WAIT,
104
105     /**
106      * State where the socket is closed
107      */
108     STATE_CLOSED 
109   };
110
111
112 /**
113  * Functions of this type are called when a message is written
114  *
115  * @param cls the closure from queue_message
116  * @param socket the socket the written message was bound to
117  */
118 typedef void (*SendFinishCallback) (void *cls,
119                                     struct GNUNET_STREAM_Socket *socket);
120
121
122 /**
123  * The send message queue
124  */
125 struct MessageQueue
126 {
127   /**
128    * The message
129    */
130   struct GNUNET_STREAM_MessageHeader *message;
131
132   /**
133    * Callback to be called when the message is sent
134    */
135   SendFinishCallback finish_cb;
136
137   /**
138    * The closure for finish_cb
139    */
140   void *finish_cb_cls;
141
142   /**
143    * The next message in queue. Should be NULL in the last message
144    */
145   struct MessageQueue *next;
146
147   /**
148    * The next message in queue. Should be NULL in the first message
149    */
150   struct MessageQueue *prev;
151 };
152
153
154 /**
155  * The STREAM Socket Handler
156  */
157 struct GNUNET_STREAM_Socket
158 {
159
160   /**
161    * The peer identity of the peer at the other end of the stream
162    */
163   struct GNUNET_PeerIdentity other_peer;
164
165   /**
166    * Retransmission timeout
167    */
168   struct GNUNET_TIME_Relative retransmit_timeout;
169
170   /**
171    * The Acknowledgement Bitmap
172    */
173   GNUNET_STREAM_AckBitmap ack_bitmap;
174
175   /**
176    * Time when the Acknowledgement was queued
177    */
178   struct GNUNET_TIME_Absolute ack_time_registered;
179
180   /**
181    * Queued Acknowledgement deadline
182    */
183   struct GNUNET_TIME_Relative ack_time_deadline;
184
185   /**
186    * The task for sending timely Acks
187    */
188   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
189
190   /**
191    * Task scheduled to continue a read operation.
192    */
193   GNUNET_SCHEDULER_TaskIdentifier read_task;
194
195   /**
196    * The mesh handle
197    */
198   struct GNUNET_MESH_Handle *mesh;
199
200   /**
201    * The mesh tunnel handle
202    */
203   struct GNUNET_MESH_Tunnel *tunnel;
204
205   /**
206    * Stream open closure
207    */
208   void *open_cls;
209
210   /**
211    * Stream open callback
212    */
213   GNUNET_STREAM_OpenCallback open_cb;
214
215   /**
216    * The current transmit handle (if a pending transmit request exists)
217    */
218   struct GNUNET_MESH_TransmitHandle *transmit_handle;
219
220   /**
221    * The current message associated with the transmit handle
222    */
223   struct MessageQueue *queue_head;
224
225   /**
226    * The queue tail, should always point to the last message in queue
227    */
228   struct MessageQueue *queue_tail;
229
230   /**
231    * The write IO_handle associated with this socket
232    */
233   struct GNUNET_STREAM_IOWriteHandle *write_handle;
234
235   /**
236    * The read IO_handle associated with this socket
237    */
238   struct GNUNET_STREAM_IOReadHandle *read_handle;
239
240   /**
241    * Buffer for storing received messages
242    */
243   void *receive_buffer;
244
245   /**
246    * Task identifier for the read io timeout task
247    */
248   GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
249
250   /**
251    * The state of the protocol associated with this socket
252    */
253   enum State state;
254
255   /**
256    * The status of the socket
257    */
258   enum GNUNET_STREAM_Status status;
259
260   /**
261    * The number of previous timeouts; FIXME: currently not used
262    */
263   unsigned int retries;
264
265   /**
266    * Is this socket derived from listen socket?
267    */
268   unsigned int derived;
269   
270   /**
271    * The application port number (type: uint32_t)
272    */
273   GNUNET_MESH_ApplicationType app_port;
274
275   /**
276    * The session id associated with this stream connection
277    * FIXME: Not used currently, may be removed
278    */
279   uint32_t session_id;
280
281   /**
282    * Write sequence number. Set to random when sending HELLO(client) and
283    * HELLO_ACK(server) 
284    */
285   uint32_t write_sequence_number;
286
287   /**
288    * Read sequence number. This number's value is determined during handshake
289    */
290   uint32_t read_sequence_number;
291
292   /**
293    * The receiver buffer size
294    */
295   uint32_t receive_buffer_size;
296
297   /**
298    * The receiver buffer boundaries
299    */
300   uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
301
302   /**
303    * receiver's available buffer after the last acknowledged packet
304    */
305   uint32_t receive_window_available;
306
307   /**
308    * The offset pointer used during write operation
309    */
310   uint32_t write_offset;
311
312   /**
313    * The offset after which we are expecting data
314    */
315   uint32_t read_offset;
316
317   /**
318    * The offset upto which user has read from the received buffer
319    */
320   uint32_t copy_offset;
321 };
322
323
324 /**
325  * A socket for listening
326  */
327 struct GNUNET_STREAM_ListenSocket
328 {
329
330   /**
331    * The mesh handle
332    */
333   struct GNUNET_MESH_Handle *mesh;
334
335   /**
336    * The callback function which is called after successful opening socket
337    */
338   GNUNET_STREAM_ListenCallback listen_cb;
339
340   /**
341    * The call back closure
342    */
343   void *listen_cb_cls;
344
345   /**
346    * The service port
347    * FIXME: Remove if not required!
348    */
349   GNUNET_MESH_ApplicationType port;
350 };
351
352
353 /**
354  * The IO Write Handle
355  */
356 struct GNUNET_STREAM_IOWriteHandle
357 {
358   /**
359    * The packet_buffers associated with this Handle
360    */
361   struct GNUNET_STREAM_DataMessage *messages[64];
362
363   /**
364    * The bitmap of this IOHandle; Corresponding bit for a message is set when
365    * it has been acknowledged by the receiver
366    */
367   GNUNET_STREAM_AckBitmap ack_bitmap;
368
369   /**
370    * Number of packets sent before waiting for an ack
371    *
372    * FIXME: Do we need this?
373    */
374   unsigned int sent_packets;
375 };
376
377
378 /**
379  * The IO Read Handle
380  */
381 struct GNUNET_STREAM_IOReadHandle
382 {
383   /**
384    * Callback for the read processor
385    */
386   GNUNET_STREAM_DataProcessor proc;
387
388   /**
389    * The closure pointer for the read processor callback
390    */
391   void *proc_cls;
392 };
393
394
395 /**
396  * Default value in seconds for various timeouts
397  */
398 static unsigned int default_timeout = 300;
399
400
401 /**
402  * Callback function for sending hello message
403  *
404  * @param cls closure the socket
405  * @param size number of bytes available in buf
406  * @param buf where the callee should write the message
407  * @return number of bytes written to buf
408  */
409 static size_t
410 send_message_notify (void *cls, size_t size, void *buf)
411 {
412   struct GNUNET_STREAM_Socket *socket = cls;
413   struct MessageQueue *head;
414   size_t ret;
415
416   socket->transmit_handle = NULL; /* Remove the transmit handle */
417   head = socket->queue_head;
418   if (NULL == head)
419     return 0; /* just to be safe */
420   if (0 == size)                /* request timed out */
421     {
422       socket->retries++;
423       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
424                   "Message sending timed out. Retry %d \n",
425                   socket->retries);
426       socket->transmit_handle = 
427         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
428                                            0, /* Corking */
429                                            1, /* Priority */
430                                            /* FIXME: exponential backoff */
431                                            socket->retransmit_timeout,
432                                            &socket->other_peer,
433                                            ntohs (head->message->header.size),
434                                            &send_message_notify,
435                                            socket);
436       return 0;
437     }
438
439   ret = ntohs (head->message->header.size);
440   GNUNET_assert (size >= ret);
441   memcpy (buf, head->message, ret);
442   if (NULL != head->finish_cb)
443     {
444       head->finish_cb (head->finish_cb_cls, socket);
445     }
446   GNUNET_CONTAINER_DLL_remove (socket->queue_head,
447                                socket->queue_tail,
448                                head);
449   GNUNET_free (head->message);
450   GNUNET_free (head);
451   head = socket->queue_head;
452   if (NULL != head)    /* more pending messages to send */
453     {
454       socket->retries = 0;
455       socket->transmit_handle = 
456         GNUNET_MESH_notify_transmit_ready (socket->tunnel,
457                                            0, /* Corking */
458                                            1, /* Priority */
459                                            /* FIXME: exponential backoff */
460                                            socket->retransmit_timeout,
461                                            &socket->other_peer,
462                                            ntohs (head->message->header.size),
463                                            &send_message_notify,
464                                            socket);
465     }
466   return ret;
467 }
468
469
470 /**
471  * Queues a message for sending using the mesh connection of a socket
472  *
473  * @param socket the socket whose mesh connection is used
474  * @param message the message to be sent
475  * @param finish_cb the callback to be called when the message is sent
476  * @param finish_cb_cls the closure for the callback
477  */
478 static void
479 queue_message (struct GNUNET_STREAM_Socket *socket,
480                struct GNUNET_STREAM_MessageHeader *message,
481                SendFinishCallback finish_cb,
482                void *finish_cb_cls)
483 {
484   struct MessageQueue *queue_entity;
485
486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487               "Queueing message of type %d and size %d\n",
488               ntohs (message->header.type),
489               ntohs (message->header.size));
490   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
491   queue_entity->message = message;
492   queue_entity->finish_cb = finish_cb;
493   queue_entity->finish_cb_cls = finish_cb_cls;
494   GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
495                                     socket->queue_tail,
496                                     queue_entity);
497   if (NULL == socket->transmit_handle)
498   {
499     socket->retries = 0;
500     socket->transmit_handle = 
501       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
502                                          0, /* Corking */
503                                          1, /* Priority */
504                                          socket->retransmit_timeout,
505                                          &socket->other_peer,
506                                          ntohs (message->header.size),
507                                          &send_message_notify,
508                                          socket);
509   }
510 }
511
512
513 /**
514  * Callback function for sending ack message
515  *
516  * @param cls closure the ACK message created in ack_task
517  * @param size number of bytes available in buffer
518  * @param buf where the callee should write the message
519  * @return number of bytes written to buf
520  */
521 static size_t
522 send_ack_notify (void *cls, size_t size, void *buf)
523 {
524   struct GNUNET_STREAM_AckMessage *ack_msg = cls;
525
526   if (0 == size)
527     {
528       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529                   "%s called with size 0\n", __func__);
530       return 0;
531     }
532   GNUNET_assert (ack_msg->header.header.size <= size);
533   
534   size = ack_msg->header.header.size;
535   memcpy (buf, ack_msg, size);
536   return size;
537 }
538
539
540 /**
541  * Task for sending ACK message
542  *
543  * @param cls the socket
544  * @param tc the Task context
545  */
546 static void
547 ack_task (void *cls,
548           const struct GNUNET_SCHEDULER_TaskContext *tc)
549 {
550   struct GNUNET_STREAM_Socket *socket = cls;
551   struct GNUNET_STREAM_AckMessage *ack_msg;
552
553   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
554     {
555       return;
556     }
557
558   socket->ack_task_id = 0;
559
560   /* Create the ACK Message */
561   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
562   ack_msg->header.header.size = htons (sizeof (struct 
563                                                GNUNET_STREAM_AckMessage));
564   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
565   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
566   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
567   ack_msg->receive_window_remaining = 
568     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
569
570   /* Request MESH for sending ACK */
571   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
572                                      0, /* Corking */
573                                      1, /* Priority */
574                                      socket->retransmit_timeout,
575                                      &socket->other_peer,
576                                      ntohs (ack_msg->header.header.size),
577                                      &send_ack_notify,
578                                      ack_msg);
579
580   
581 }
582
583
584 /**
585  * Function to modify a bit in GNUNET_STREAM_AckBitmap
586  *
587  * @param bitmap the bitmap to modify
588  * @param bit the bit number to modify
589  * @param value GNUNET_YES to on, GNUNET_NO to off
590  */
591 static void
592 ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
593                       unsigned int bit, 
594                       int value)
595 {
596   GNUNET_assert (bit < 64);
597   if (GNUNET_YES == value)
598     *bitmap |= (1LL << bit);
599   else
600     *bitmap &= ~(1LL << bit);
601 }
602
603
604 /**
605  * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
606  *
607  * @param bitmap address of the bitmap that has to be checked
608  * @param bit the bit number to check
609  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
610  */
611 static uint8_t
612 ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
613                       unsigned int bit)
614 {
615   GNUNET_assert (bit < 64);
616   return 0 != (*bitmap & (1LL << bit));
617 }
618
619
620
621 /**
622  * Function called when Data Message is sent
623  *
624  * @param cls the io_handle corresponding to the Data Message
625  * @param socket the socket which was used
626  */
627 static void
628 write_data_finish_cb (void *cls,
629                       struct GNUNET_STREAM_Socket *socket)
630 {
631   struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
632
633   io_handle->sent_packets++;
634 }
635
636
637 /**
638  * Writes data using the given socket. The amount of data written is limited by
639  * the receive_window_size
640  *
641  * @param socket the socket to use
642  */
643 static void 
644 write_data (struct GNUNET_STREAM_Socket *socket)
645 {
646   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
647   unsigned int packet;
648   int ack_packet;
649
650   ack_packet = -1;
651   /* Find the last acknowledged packet */
652   for (packet=0; packet < 64; packet++)
653     {      
654       if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
655                                               packet))
656         ack_packet = packet;        
657       else if (NULL == io_handle->messages[packet])
658         break;
659     }
660   /* Resend packets which weren't ack'ed */
661   for (packet=0; packet < ack_packet; packet++)
662     {
663       if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
664                                              packet))
665         {
666           queue_message (socket,
667                          &io_handle->messages[packet]->header,
668                          NULL,
669                          NULL);
670         }
671     }
672   packet = ack_packet + 1;
673   /* Now send new packets if there is enough buffer space */
674   while ( (NULL != io_handle->messages[packet]) &&
675           (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
676     {
677       socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
678       queue_message (socket,
679                      &io_handle->messages[packet]->header,
680                      &write_data_finish_cb,
681                      io_handle);
682       packet++;
683     }
684 }
685
686
687 /**
688  * Task for calling the read processor
689  *
690  * @param cls the socket
691  * @param tc the task context
692  */
693 static void
694 call_read_processor (void *cls,
695                           const struct GNUNET_SCHEDULER_TaskContext *tc)
696 {
697   struct GNUNET_STREAM_Socket *socket = cls;
698   size_t read_size;
699   size_t valid_read_size;
700   unsigned int packet;
701   uint32_t sequence_increase;
702   uint32_t offset_increase;
703
704   socket->read_task = GNUNET_SCHEDULER_NO_TASK;
705   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
706     return;
707
708   GNUNET_assert (NULL != socket->read_handle);
709   GNUNET_assert (NULL != socket->read_handle->proc);
710
711   /* Check the bitmap for any holes */
712   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
713     {
714       if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
715                                              packet))
716         break;
717     }
718   /* We only call read processor if we have the first packet */
719   GNUNET_assert (0 < packet);
720
721   valid_read_size = 
722     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
723
724   GNUNET_assert (0 != valid_read_size);
725
726   /* Cancel the read_io_timeout_task */
727   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
728   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
729
730   /* Call the data processor */
731   read_size = 
732     socket->read_handle->proc (socket->read_handle->proc_cls,
733                                socket->status,
734                                socket->receive_buffer + socket->copy_offset,
735                                valid_read_size);
736   /* Free the read handle */
737   GNUNET_free (socket->read_handle);
738   socket->read_handle = NULL;
739
740   GNUNET_assert (read_size <= valid_read_size);
741   socket->copy_offset += read_size;
742
743   /* Determine upto which packet we can remove from the buffer */
744   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
745     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
746       break;
747
748   /* If no packets can be removed we can't move the buffer */
749   if (0 == packet) return;
750
751   sequence_increase = packet;
752
753   /* Shift the data in the receive buffer */
754   memmove (socket->receive_buffer,
755            socket->receive_buffer 
756            + socket->receive_buffer_boundaries[sequence_increase-1],
757            socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
758   
759   /* Shift the bitmap */
760   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
761   
762   /* Set read_sequence_number */
763   socket->read_sequence_number += sequence_increase;
764   
765   /* Set read_offset */
766   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
767   socket->read_offset += offset_increase;
768
769   /* Fix copy_offset */
770   GNUNET_assert (offset_increase <= socket->copy_offset);
771   socket->copy_offset -= offset_increase;
772   
773   /* Fix relative boundaries */
774   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
775     {
776       if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
777         {
778           socket->receive_buffer_boundaries[packet] = 
779             socket->receive_buffer_boundaries[packet + sequence_increase] 
780             - offset_increase;
781         }
782       else
783         socket->receive_buffer_boundaries[packet] = 0;
784     }
785 }
786
787
788 /**
789  * Cancels the existing read io handle
790  *
791  * @param cls the closure from the SCHEDULER call
792  * @param tc the task context
793  */
794 static void
795 read_io_timeout (void *cls, 
796                 const struct GNUNET_SCHEDULER_TaskContext *tc)
797 {
798   struct GNUNET_STREAM_Socket *socket = cls;
799
800   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
801   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
802   {
803     GNUNET_SCHEDULER_cancel (socket->read_task);
804     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
805   }
806   GNUNET_assert (NULL != socket->read_handle);
807   
808   GNUNET_free (socket->read_handle);
809   socket->read_handle = NULL;
810 }
811
812
813 /**
814  * Handler for DATA messages; Same for both client and server
815  *
816  * @param socket the socket through which the ack was received
817  * @param tunnel connection to the other end
818  * @param sender who sent the message
819  * @param msg the data message
820  * @param atsi performance data for the connection
821  * @return GNUNET_OK to keep the connection open,
822  *         GNUNET_SYSERR to close it (signal serious error)
823  */
824 static int
825 handle_data (struct GNUNET_STREAM_Socket *socket,
826              struct GNUNET_MESH_Tunnel *tunnel,
827              const struct GNUNET_PeerIdentity *sender,
828              const struct GNUNET_STREAM_DataMessage *msg,
829              const struct GNUNET_ATS_Information*atsi)
830 {
831   const void *payload;
832   uint32_t bytes_needed;
833   uint32_t relative_offset;
834   uint32_t relative_sequence_number;
835   uint16_t size;
836
837   size = htons (msg->header.header.size);
838   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
839     {
840       GNUNET_break_op (0);
841       return GNUNET_SYSERR;
842     }
843
844   switch (socket->state)
845     {
846     case STATE_ESTABLISHED:
847     case STATE_TRANSMIT_CLOSED:
848     case STATE_TRANSMIT_CLOSE_WAIT:
849
850       /* check if the message's sequence number is in the range we are
851          expecting */
852       relative_sequence_number = 
853         ntohl (msg->sequence_number) - socket->read_sequence_number;
854       if ( relative_sequence_number > 64)
855         {
856           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857                       "Ignoring received message with sequence number %d",
858                       ntohl (msg->sequence_number));
859           return GNUNET_YES;
860         }
861
862       /* Check if we have to allocate the buffer */
863       size -= sizeof (struct GNUNET_STREAM_DataMessage);
864       relative_offset = ntohl (msg->offset) - socket->read_offset;
865       bytes_needed = relative_offset + size;
866       
867       if (bytes_needed > socket->receive_buffer_size)
868         {
869           if (bytes_needed <= RECEIVE_BUFFER_SIZE)
870             {
871               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
872                                                        bytes_needed);
873               socket->receive_buffer_size = bytes_needed;
874             }
875           else
876             {
877               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878                           "Cannot accommodate packet %d as buffer is full\n",
879                           ntohl (msg->sequence_number));
880               return GNUNET_YES;
881             }
882         }
883       
884       /* Copy Data to buffer */
885       payload = &msg[1];
886       GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
887       memcpy (socket->receive_buffer + relative_offset,
888               payload,
889               size);
890       socket->receive_buffer_boundaries[relative_sequence_number] = 
891         relative_offset + size;
892       
893       /* Modify the ACK bitmap */
894       ackbitmap_modify_bit (&socket->ack_bitmap,
895                             relative_sequence_number,
896                             GNUNET_YES);
897
898       /* Start ACK sending task if one is not already present */
899       if (0 == socket->ack_task_id)
900        {
901          socket->ack_task_id = 
902            GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
903                                          (msg->ack_deadline),
904                                          &ack_task,
905                                          socket);
906        }
907
908       if ((NULL != socket->read_handle) /* A read handle is waiting */
909           /* There is no current read task */
910           && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
911           /* We have the first packet */
912           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
913                                                  0)))
914         {
915           socket->read_task = 
916             GNUNET_SCHEDULER_add_now (&call_read_processor,
917                                       socket);
918         }
919       
920       break;
921
922     default:
923       /* FIXME: call statistics */
924       break;
925     }
926   return GNUNET_YES;
927 }
928
929 /**
930  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
931  *
932  * @param cls the socket (set from GNUNET_MESH_connect)
933  * @param tunnel connection to the other end
934  * @param tunnel_ctx place to store local state associated with the tunnel
935  * @param sender who sent the message
936  * @param message the actual message
937  * @param atsi performance data for the connection
938  * @return GNUNET_OK to keep the connection open,
939  *         GNUNET_SYSERR to close it (signal serious error)
940  */
941 static int
942 client_handle_data (void *cls,
943              struct GNUNET_MESH_Tunnel *tunnel,
944              void **tunnel_ctx,
945              const struct GNUNET_PeerIdentity *sender,
946              const struct GNUNET_MessageHeader *message,
947              const struct GNUNET_ATS_Information*atsi)
948 {
949   struct GNUNET_STREAM_Socket *socket = cls;
950
951   return handle_data (socket, 
952                       tunnel, 
953                       sender, 
954                       (const struct GNUNET_STREAM_DataMessage *) message, 
955                       atsi);
956 }
957
958
959 /**
960  * Callback to set state to ESTABLISHED
961  *
962  * @param cls the closure from queue_message FIXME: document
963  * @param socket the socket to requiring state change
964  */
965 static void
966 set_state_established (void *cls,
967                        struct GNUNET_STREAM_Socket *socket)
968 {
969   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
970   socket->write_offset = 0;
971   socket->read_offset = 0;
972   socket->state = STATE_ESTABLISHED;
973   if (socket->open_cb)
974     socket->open_cb (socket->open_cls, socket);
975 }
976
977
978 /**
979  * Callback to set state to HELLO_WAIT
980  *
981  * @param cls the closure from queue_message
982  * @param socket the socket to requiring state change
983  */
984 static void
985 set_state_hello_wait (void *cls,
986                       struct GNUNET_STREAM_Socket *socket)
987 {
988   GNUNET_assert (STATE_INIT == socket->state);
989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
990   socket->state = STATE_HELLO_WAIT;
991 }
992
993
994 /**
995  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
996  *
997  * @param cls the socket (set from GNUNET_MESH_connect)
998  * @param tunnel connection to the other end
999  * @param tunnel_ctx this is NULL
1000  * @param sender who sent the message
1001  * @param message the actual message
1002  * @param atsi performance data for the connection
1003  * @return GNUNET_OK to keep the connection open,
1004  *         GNUNET_SYSERR to close it (signal serious error)
1005  */
1006 static int
1007 client_handle_hello_ack (void *cls,
1008                          struct GNUNET_MESH_Tunnel *tunnel,
1009                          void **tunnel_ctx,
1010                          const struct GNUNET_PeerIdentity *sender,
1011                          const struct GNUNET_MessageHeader *message,
1012                          const struct GNUNET_ATS_Information*atsi)
1013 {
1014   struct GNUNET_STREAM_Socket *socket = cls;
1015   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1016   struct GNUNET_STREAM_HelloAckMessage *reply;
1017
1018   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1019   GNUNET_assert (socket->tunnel == tunnel);
1020   switch (socket->state)
1021   {
1022   case STATE_HELLO_WAIT:
1023       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1024       socket->receive_window_available = ntohl (ack_msg->receive_window_size);
1025       /* Get the random sequence number */
1026       socket->write_sequence_number = 
1027         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1028       reply = 
1029         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1030       reply->header.header.size = 
1031         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1032       reply->header.header.type = 
1033         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1034       reply->sequence_number = htonl (socket->write_sequence_number);
1035       reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
1036       queue_message (socket, 
1037                      &reply->header, 
1038                      &set_state_established, 
1039                      NULL);      
1040       return GNUNET_OK;
1041   case STATE_ESTABLISHED:
1042   case STATE_RECEIVE_CLOSE_WAIT:
1043     // call statistics (# ACKs ignored++)
1044     return GNUNET_OK;
1045   case STATE_INIT:
1046   default:
1047     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048                 "Server sent HELLO_ACK when in state %d\n", socket->state);
1049     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1050     return GNUNET_SYSERR;
1051   }
1052
1053 }
1054
1055
1056 /**
1057  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1058  *
1059  * @param cls the socket (set from GNUNET_MESH_connect)
1060  * @param tunnel connection to the other end
1061  * @param tunnel_ctx this is NULL
1062  * @param sender who sent the message
1063  * @param message the actual message
1064  * @param atsi performance data for the connection
1065  * @return GNUNET_OK to keep the connection open,
1066  *         GNUNET_SYSERR to close it (signal serious error)
1067  */
1068 static int
1069 client_handle_reset (void *cls,
1070                      struct GNUNET_MESH_Tunnel *tunnel,
1071                      void **tunnel_ctx,
1072                      const struct GNUNET_PeerIdentity *sender,
1073                      const struct GNUNET_MessageHeader *message,
1074                      const struct GNUNET_ATS_Information*atsi)
1075 {
1076   struct GNUNET_STREAM_Socket *socket = cls;
1077
1078   return GNUNET_OK;
1079 }
1080
1081
1082 /**
1083  * Common message handler for handling TRANSMIT_CLOSE messages
1084  *
1085  * @param socket the socket through which the ack was received
1086  * @param tunnel connection to the other end
1087  * @param sender who sent the message
1088  * @param msg the transmit close message
1089  * @param atsi performance data for the connection
1090  * @return GNUNET_OK to keep the connection open,
1091  *         GNUNET_SYSERR to close it (signal serious error)
1092  */
1093 static int
1094 handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1095                        struct GNUNET_MESH_Tunnel *tunnel,
1096                        const struct GNUNET_PeerIdentity *sender,
1097                        const struct GNUNET_STREAM_MessageHeader *msg,
1098                        const struct GNUNET_ATS_Information*atsi)
1099 {
1100   struct GNUNET_STREAM_MessageHeader *reply;
1101
1102   switch (socket->state)
1103     {
1104     case STATE_ESTABLISHED:
1105       socket->state = STATE_RECEIVE_CLOSED;
1106
1107       /* Send TRANSMIT_CLOSE_ACK */
1108       reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1109       reply->header.type = 
1110         htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1111       reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1112       queue_message (socket, reply, NULL, NULL);
1113       break;
1114
1115     default:
1116       /* FIXME: Call statistics? */
1117       break;
1118     }
1119   return GNUNET_YES;
1120 }
1121
1122
1123 /**
1124  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1125  *
1126  * @param cls the socket (set from GNUNET_MESH_connect)
1127  * @param tunnel connection to the other end
1128  * @param tunnel_ctx this is NULL
1129  * @param sender who sent the message
1130  * @param message the actual message
1131  * @param atsi performance data for the connection
1132  * @return GNUNET_OK to keep the connection open,
1133  *         GNUNET_SYSERR to close it (signal serious error)
1134  */
1135 static int
1136 client_handle_transmit_close (void *cls,
1137                               struct GNUNET_MESH_Tunnel *tunnel,
1138                               void **tunnel_ctx,
1139                               const struct GNUNET_PeerIdentity *sender,
1140                               const struct GNUNET_MessageHeader *message,
1141                               const struct GNUNET_ATS_Information*atsi)
1142 {
1143   struct GNUNET_STREAM_Socket *socket = cls;
1144   
1145   return handle_transmit_close (socket,
1146                                 tunnel,
1147                                 sender,
1148                                 (struct GNUNET_STREAM_MessageHeader *)message,
1149                                 atsi);
1150 }
1151
1152
1153 /**
1154  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1155  *
1156  * @param cls the socket (set from GNUNET_MESH_connect)
1157  * @param tunnel connection to the other end
1158  * @param tunnel_ctx this is NULL
1159  * @param sender who sent the message
1160  * @param message the actual message
1161  * @param atsi performance data for the connection
1162  * @return GNUNET_OK to keep the connection open,
1163  *         GNUNET_SYSERR to close it (signal serious error)
1164  */
1165 static int
1166 client_handle_transmit_close_ack (void *cls,
1167                                   struct GNUNET_MESH_Tunnel *tunnel,
1168                                   void **tunnel_ctx,
1169                                   const struct GNUNET_PeerIdentity *sender,
1170                                   const struct GNUNET_MessageHeader *message,
1171                                   const struct GNUNET_ATS_Information*atsi)
1172 {
1173   struct GNUNET_STREAM_Socket *socket = cls;
1174
1175   return GNUNET_OK;
1176 }
1177
1178
1179 /**
1180  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1181  *
1182  * @param cls the socket (set from GNUNET_MESH_connect)
1183  * @param tunnel connection to the other end
1184  * @param tunnel_ctx this is NULL
1185  * @param sender who sent the message
1186  * @param message the actual message
1187  * @param atsi performance data for the connection
1188  * @return GNUNET_OK to keep the connection open,
1189  *         GNUNET_SYSERR to close it (signal serious error)
1190  */
1191 static int
1192 client_handle_receive_close (void *cls,
1193                              struct GNUNET_MESH_Tunnel *tunnel,
1194                              void **tunnel_ctx,
1195                              const struct GNUNET_PeerIdentity *sender,
1196                              const struct GNUNET_MessageHeader *message,
1197                              const struct GNUNET_ATS_Information*atsi)
1198 {
1199   struct GNUNET_STREAM_Socket *socket = cls;
1200
1201   return GNUNET_OK;
1202 }
1203
1204
1205 /**
1206  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1207  *
1208  * @param cls the socket (set from GNUNET_MESH_connect)
1209  * @param tunnel connection to the other end
1210  * @param tunnel_ctx this is NULL
1211  * @param sender who sent the message
1212  * @param message the actual message
1213  * @param atsi performance data for the connection
1214  * @return GNUNET_OK to keep the connection open,
1215  *         GNUNET_SYSERR to close it (signal serious error)
1216  */
1217 static int
1218 client_handle_receive_close_ack (void *cls,
1219                                  struct GNUNET_MESH_Tunnel *tunnel,
1220                                  void **tunnel_ctx,
1221                                  const struct GNUNET_PeerIdentity *sender,
1222                                  const struct GNUNET_MessageHeader *message,
1223                                  const struct GNUNET_ATS_Information*atsi)
1224 {
1225   struct GNUNET_STREAM_Socket *socket = cls;
1226
1227   return GNUNET_OK;
1228 }
1229
1230
1231 /**
1232  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1233  *
1234  * @param cls the socket (set from GNUNET_MESH_connect)
1235  * @param tunnel connection to the other end
1236  * @param tunnel_ctx this is NULL
1237  * @param sender who sent the message
1238  * @param message the actual message
1239  * @param atsi performance data for the connection
1240  * @return GNUNET_OK to keep the connection open,
1241  *         GNUNET_SYSERR to close it (signal serious error)
1242  */
1243 static int
1244 client_handle_close (void *cls,
1245                      struct GNUNET_MESH_Tunnel *tunnel,
1246                      void **tunnel_ctx,
1247                      const struct GNUNET_PeerIdentity *sender,
1248                      const struct GNUNET_MessageHeader *message,
1249                      const struct GNUNET_ATS_Information*atsi)
1250 {
1251   struct GNUNET_STREAM_Socket *socket = cls;
1252
1253   return GNUNET_OK;
1254 }
1255
1256
1257 /**
1258  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1259  *
1260  * @param cls the socket (set from GNUNET_MESH_connect)
1261  * @param tunnel connection to the other end
1262  * @param tunnel_ctx this is NULL
1263  * @param sender who sent the message
1264  * @param message the actual message
1265  * @param atsi performance data for the connection
1266  * @return GNUNET_OK to keep the connection open,
1267  *         GNUNET_SYSERR to close it (signal serious error)
1268  */
1269 static int
1270 client_handle_close_ack (void *cls,
1271                          struct GNUNET_MESH_Tunnel *tunnel,
1272                          void **tunnel_ctx,
1273                          const struct GNUNET_PeerIdentity *sender,
1274                          const struct GNUNET_MessageHeader *message,
1275                          const struct GNUNET_ATS_Information*atsi)
1276 {
1277   struct GNUNET_STREAM_Socket *socket = cls;
1278
1279   return GNUNET_OK;
1280 }
1281
1282 /*****************************/
1283 /* Server's Message Handlers */
1284 /*****************************/
1285
1286 /**
1287  * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1288  *
1289  * @param cls the closure
1290  * @param tunnel connection to the other end
1291  * @param tunnel_ctx the socket
1292  * @param sender who sent the message
1293  * @param message the actual message
1294  * @param atsi performance data for the connection
1295  * @return GNUNET_OK to keep the connection open,
1296  *         GNUNET_SYSERR to close it (signal serious error)
1297  */
1298 static int
1299 server_handle_data (void *cls,
1300                     struct GNUNET_MESH_Tunnel *tunnel,
1301                     void **tunnel_ctx,
1302                     const struct GNUNET_PeerIdentity *sender,
1303                     const struct GNUNET_MessageHeader *message,
1304                     const struct GNUNET_ATS_Information*atsi)
1305 {
1306   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1307
1308   return handle_data (socket,
1309                       tunnel,
1310                       sender,
1311                       (const struct GNUNET_STREAM_DataMessage *)message,
1312                       atsi);
1313 }
1314
1315
1316 /**
1317  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
1318  *
1319  * @param cls the closure
1320  * @param tunnel connection to the other end
1321  * @param tunnel_ctx the socket
1322  * @param sender who sent the message
1323  * @param message the actual message
1324  * @param atsi performance data for the connection
1325  * @return GNUNET_OK to keep the connection open,
1326  *         GNUNET_SYSERR to close it (signal serious error)
1327  */
1328 static int
1329 server_handle_hello (void *cls,
1330                      struct GNUNET_MESH_Tunnel *tunnel,
1331                      void **tunnel_ctx,
1332                      const struct GNUNET_PeerIdentity *sender,
1333                      const struct GNUNET_MessageHeader *message,
1334                      const struct GNUNET_ATS_Information*atsi)
1335 {
1336   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1337   struct GNUNET_STREAM_HelloAckMessage *reply;
1338
1339   GNUNET_assert (socket->tunnel == tunnel);
1340   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1341               "Received HELLO from %s\n", GNUNET_i2s(sender));
1342
1343   /* Catch possible protocol breaks */
1344   GNUNET_break_op (0 != memcmp (&socket->other_peer, 
1345                                 sender,
1346                                 sizeof (struct GNUNET_PeerIdentity)));
1347
1348   if (STATE_INIT == socket->state)
1349     {
1350       /* Get the random sequence number */
1351       socket->write_sequence_number = 
1352         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1353       reply = 
1354         GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1355       reply->header.header.size = 
1356         htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1357       reply->header.header.type = 
1358         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1359       reply->sequence_number = htonl (socket->write_sequence_number);
1360       queue_message (socket, 
1361                      &reply->header,
1362                      &set_state_hello_wait, 
1363                      NULL);
1364     }
1365   else
1366     {
1367       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1368                   "Client sent HELLO when in state %d\n", socket->state);
1369       /* FIXME: Send RESET? */
1370       
1371     }
1372   return GNUNET_OK;
1373 }
1374
1375
1376 /**
1377  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1378  *
1379  * @param cls the closure
1380  * @param tunnel connection to the other end
1381  * @param tunnel_ctx the socket
1382  * @param sender who sent the message
1383  * @param message the actual message
1384  * @param atsi performance data for the connection
1385  * @return GNUNET_OK to keep the connection open,
1386  *         GNUNET_SYSERR to close it (signal serious error)
1387  */
1388 static int
1389 server_handle_hello_ack (void *cls,
1390                          struct GNUNET_MESH_Tunnel *tunnel,
1391                          void **tunnel_ctx,
1392                          const struct GNUNET_PeerIdentity *sender,
1393                          const struct GNUNET_MessageHeader *message,
1394                          const struct GNUNET_ATS_Information*atsi)
1395 {
1396   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1397   const struct GNUNET_STREAM_HelloAckMessage *ack_message;
1398
1399   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
1400   GNUNET_assert (socket->tunnel == tunnel);
1401   if (STATE_HELLO_WAIT == socket->state)
1402     {
1403       socket->read_sequence_number = ntohl (ack_message->sequence_number);
1404       socket->receive_window_available = 
1405         ntohl (ack_message->receive_window_size);
1406       /* Attain ESTABLISHED state */
1407       set_state_established (NULL, socket);
1408     }
1409   else
1410     {
1411       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1412                   "Client sent HELLO_ACK when in state %d\n", socket->state);
1413       /* FIXME: Send RESET? */
1414       
1415     }
1416   return GNUNET_OK;
1417 }
1418
1419
1420 /**
1421  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1422  *
1423  * @param cls the closure
1424  * @param tunnel connection to the other end
1425  * @param tunnel_ctx the socket
1426  * @param sender who sent the message
1427  * @param message the actual message
1428  * @param atsi performance data for the connection
1429  * @return GNUNET_OK to keep the connection open,
1430  *         GNUNET_SYSERR to close it (signal serious error)
1431  */
1432 static int
1433 server_handle_reset (void *cls,
1434                      struct GNUNET_MESH_Tunnel *tunnel,
1435                      void **tunnel_ctx,
1436                      const struct GNUNET_PeerIdentity *sender,
1437                      const struct GNUNET_MessageHeader *message,
1438                      const struct GNUNET_ATS_Information*atsi)
1439 {
1440   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1441
1442   return GNUNET_OK;
1443 }
1444
1445
1446 /**
1447  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1448  *
1449  * @param cls the closure
1450  * @param tunnel connection to the other end
1451  * @param tunnel_ctx the socket
1452  * @param sender who sent the message
1453  * @param message the actual message
1454  * @param atsi performance data for the connection
1455  * @return GNUNET_OK to keep the connection open,
1456  *         GNUNET_SYSERR to close it (signal serious error)
1457  */
1458 static int
1459 server_handle_transmit_close (void *cls,
1460                               struct GNUNET_MESH_Tunnel *tunnel,
1461                               void **tunnel_ctx,
1462                               const struct GNUNET_PeerIdentity *sender,
1463                               const struct GNUNET_MessageHeader *message,
1464                               const struct GNUNET_ATS_Information*atsi)
1465 {
1466   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1467
1468   return handle_transmit_close (socket,
1469                                 tunnel,
1470                                 sender,
1471                                 (struct GNUNET_STREAM_MessageHeader *)message,
1472                                 atsi);
1473 }
1474
1475
1476 /**
1477  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1478  *
1479  * @param cls the closure
1480  * @param tunnel connection to the other end
1481  * @param tunnel_ctx the socket
1482  * @param sender who sent the message
1483  * @param message the actual message
1484  * @param atsi performance data for the connection
1485  * @return GNUNET_OK to keep the connection open,
1486  *         GNUNET_SYSERR to close it (signal serious error)
1487  */
1488 static int
1489 server_handle_transmit_close_ack (void *cls,
1490                                   struct GNUNET_MESH_Tunnel *tunnel,
1491                                   void **tunnel_ctx,
1492                                   const struct GNUNET_PeerIdentity *sender,
1493                                   const struct GNUNET_MessageHeader *message,
1494                                   const struct GNUNET_ATS_Information*atsi)
1495 {
1496   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1497
1498   return GNUNET_OK;
1499 }
1500
1501
1502 /**
1503  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1504  *
1505  * @param cls the closure
1506  * @param tunnel connection to the other end
1507  * @param tunnel_ctx the socket
1508  * @param sender who sent the message
1509  * @param message the actual message
1510  * @param atsi performance data for the connection
1511  * @return GNUNET_OK to keep the connection open,
1512  *         GNUNET_SYSERR to close it (signal serious error)
1513  */
1514 static int
1515 server_handle_receive_close (void *cls,
1516                              struct GNUNET_MESH_Tunnel *tunnel,
1517                              void **tunnel_ctx,
1518                              const struct GNUNET_PeerIdentity *sender,
1519                              const struct GNUNET_MessageHeader *message,
1520                              const struct GNUNET_ATS_Information*atsi)
1521 {
1522   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1523
1524   return GNUNET_OK;
1525 }
1526
1527
1528 /**
1529  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
1530  *
1531  * @param cls the closure
1532  * @param tunnel connection to the other end
1533  * @param tunnel_ctx the socket
1534  * @param sender who sent the message
1535  * @param message the actual message
1536  * @param atsi performance data for the connection
1537  * @return GNUNET_OK to keep the connection open,
1538  *         GNUNET_SYSERR to close it (signal serious error)
1539  */
1540 static int
1541 server_handle_receive_close_ack (void *cls,
1542                                  struct GNUNET_MESH_Tunnel *tunnel,
1543                                  void **tunnel_ctx,
1544                                  const struct GNUNET_PeerIdentity *sender,
1545                                  const struct GNUNET_MessageHeader *message,
1546                                  const struct GNUNET_ATS_Information*atsi)
1547 {
1548   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1549
1550   return GNUNET_OK;
1551 }
1552
1553
1554 /**
1555  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
1556  *
1557  * @param cls the closure
1558  * @param tunnel connection to the other end
1559  * @param tunnel_ctx the socket
1560  * @param sender who sent the message
1561  * @param message the actual message
1562  * @param atsi performance data for the connection
1563  * @return GNUNET_OK to keep the connection open,
1564  *         GNUNET_SYSERR to close it (signal serious error)
1565  */
1566 static int
1567 server_handle_close (void *cls,
1568                      struct GNUNET_MESH_Tunnel *tunnel,
1569                      void **tunnel_ctx,
1570                      const struct GNUNET_PeerIdentity *sender,
1571                      const struct GNUNET_MessageHeader *message,
1572                      const struct GNUNET_ATS_Information*atsi)
1573 {
1574   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1575
1576   return GNUNET_OK;
1577 }
1578
1579
1580 /**
1581  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1582  *
1583  * @param cls the closure
1584  * @param tunnel connection to the other end
1585  * @param tunnel_ctx the socket
1586  * @param sender who sent the message
1587  * @param message the actual message
1588  * @param atsi performance data for the connection
1589  * @return GNUNET_OK to keep the connection open,
1590  *         GNUNET_SYSERR to close it (signal serious error)
1591  */
1592 static int
1593 server_handle_close_ack (void *cls,
1594                          struct GNUNET_MESH_Tunnel *tunnel,
1595                          void **tunnel_ctx,
1596                          const struct GNUNET_PeerIdentity *sender,
1597                          const struct GNUNET_MessageHeader *message,
1598                          const struct GNUNET_ATS_Information*atsi)
1599 {
1600   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1601
1602   return GNUNET_OK;
1603 }
1604
1605
1606 /**
1607  * Message Handler for mesh
1608  *
1609  * @param socket the socket through which the ack was received
1610  * @param tunnel connection to the other end
1611  * @param sender who sent the message
1612  * @param ack the acknowledgment message
1613  * @param atsi performance data for the connection
1614  * @return GNUNET_OK to keep the connection open,
1615  *         GNUNET_SYSERR to close it (signal serious error)
1616  */
1617 static int
1618 handle_ack (struct GNUNET_STREAM_Socket *socket,
1619             struct GNUNET_MESH_Tunnel *tunnel,
1620             const struct GNUNET_PeerIdentity *sender,
1621             const struct GNUNET_STREAM_AckMessage *ack,
1622             const struct GNUNET_ATS_Information*atsi)
1623 {
1624   switch (socket->state)
1625     {
1626     case (STATE_ESTABLISHED):
1627       if (NULL == socket->write_handle)
1628         {
1629           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1630                       "Received DATA ACK when write_handle is NULL\n");
1631           return GNUNET_OK;
1632         }
1633
1634       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1635       socket->receive_window_available = 
1636         ntohl (ack->receive_window_remaining);
1637       write_data (socket);
1638       break;
1639     default:
1640       break;
1641     }
1642   return GNUNET_OK;
1643 }
1644
1645
1646 /**
1647  * Message Handler for mesh
1648  *
1649  * @param cls the 'struct GNUNET_STREAM_Socket'
1650  * @param tunnel connection to the other end
1651  * @param tunnel_ctx unused
1652  * @param sender who sent the message
1653  * @param message the actual message
1654  * @param atsi performance data for the connection
1655  * @return GNUNET_OK to keep the connection open,
1656  *         GNUNET_SYSERR to close it (signal serious error)
1657  */
1658 static int
1659 client_handle_ack (void *cls,
1660                    struct GNUNET_MESH_Tunnel *tunnel,
1661                    void **tunnel_ctx,
1662                    const struct GNUNET_PeerIdentity *sender,
1663                    const struct GNUNET_MessageHeader *message,
1664                    const struct GNUNET_ATS_Information*atsi)
1665 {
1666   struct GNUNET_STREAM_Socket *socket = cls;
1667   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1668  
1669   return handle_ack (socket, tunnel, sender, ack, atsi);
1670 }
1671
1672
1673 /**
1674  * Message Handler for mesh
1675  *
1676  * @param cls the server's listen socket
1677  * @param tunnel connection to the other end
1678  * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
1679  * @param sender who sent the message
1680  * @param message the actual message
1681  * @param atsi performance data for the connection
1682  * @return GNUNET_OK to keep the connection open,
1683  *         GNUNET_SYSERR to close it (signal serious error)
1684  */
1685 static int
1686 server_handle_ack (void *cls,
1687                    struct GNUNET_MESH_Tunnel *tunnel,
1688                    void **tunnel_ctx,
1689                    const struct GNUNET_PeerIdentity *sender,
1690                    const struct GNUNET_MessageHeader *message,
1691                    const struct GNUNET_ATS_Information*atsi)
1692 {
1693   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
1694   const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
1695  
1696   return handle_ack (socket, tunnel, sender, ack, atsi);
1697 }
1698
1699
1700 /**
1701  * For client message handlers, the stream socket is in the
1702  * closure argument.
1703  */
1704 static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
1705   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1706   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1707    sizeof (struct GNUNET_STREAM_AckMessage) },
1708   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1709    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1710   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1711    sizeof (struct GNUNET_STREAM_MessageHeader)},
1712   {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1713    sizeof (struct GNUNET_STREAM_MessageHeader)},
1714   {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1715    sizeof (struct GNUNET_STREAM_MessageHeader)},
1716   {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1717    sizeof (struct GNUNET_STREAM_MessageHeader)},
1718   {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1719    sizeof (struct GNUNET_STREAM_MessageHeader)},
1720   {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1721    sizeof (struct GNUNET_STREAM_MessageHeader)},
1722   {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1723    sizeof (struct GNUNET_STREAM_MessageHeader)},
1724   {NULL, 0, 0}
1725 };
1726
1727
1728 /**
1729  * For server message handlers, the stream socket is in the
1730  * tunnel context, and the listen socket in the closure argument.
1731  */
1732 static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
1733   {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
1734   {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
1735    sizeof (struct GNUNET_STREAM_AckMessage) },
1736   {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
1737    sizeof (struct GNUNET_STREAM_MessageHeader)},
1738   {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
1739    sizeof (struct GNUNET_STREAM_HelloAckMessage)},
1740   {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
1741    sizeof (struct GNUNET_STREAM_MessageHeader)},
1742   {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
1743    sizeof (struct GNUNET_STREAM_MessageHeader)},
1744   {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
1745    sizeof (struct GNUNET_STREAM_MessageHeader)},
1746   {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
1747    sizeof (struct GNUNET_STREAM_MessageHeader)},
1748   {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
1749    sizeof (struct GNUNET_STREAM_MessageHeader)},
1750   {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
1751    sizeof (struct GNUNET_STREAM_MessageHeader)},
1752   {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
1753    sizeof (struct GNUNET_STREAM_MessageHeader)},
1754   {NULL, 0, 0}
1755 };
1756
1757
1758 /**
1759  * Function called when our target peer is connected to our tunnel
1760  *
1761  * @param cls the socket for which this tunnel is created
1762  * @param peer the peer identity of the target
1763  * @param atsi performance data for the connection
1764  */
1765 static void
1766 mesh_peer_connect_callback (void *cls,
1767                             const struct GNUNET_PeerIdentity *peer,
1768                             const struct GNUNET_ATS_Information * atsi)
1769 {
1770   struct GNUNET_STREAM_Socket *socket = cls;
1771   struct GNUNET_STREAM_MessageHeader *message;
1772
1773   if (0 != memcmp (&socket->other_peer, 
1774                    peer, 
1775                    sizeof (struct GNUNET_PeerIdentity)))
1776     {
1777       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778                   "A peer (%s) which is not our target has connected to our tunnel", 
1779                   GNUNET_i2s (peer));
1780       return;
1781     }
1782   
1783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784               "Target peer %s connected\n", GNUNET_i2s (peer));
1785   
1786   /* Set state to INIT */
1787   socket->state = STATE_INIT;
1788
1789   /* Send HELLO message */
1790   message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1791   message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1792   message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1793   queue_message (socket,
1794                  message,
1795                  &set_state_hello_wait,
1796                  NULL);
1797
1798   /* Call open callback */
1799   if (NULL == socket->open_cb)
1800     {
1801       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1802                   "STREAM_open callback is NULL\n");
1803     }
1804 }
1805
1806
1807 /**
1808  * Function called when our target peer is disconnected from our tunnel
1809  *
1810  * @param cls the socket associated which this tunnel
1811  * @param peer the peer identity of the target
1812  */
1813 static void
1814 mesh_peer_disconnect_callback (void *cls,
1815                                const struct GNUNET_PeerIdentity *peer)
1816 {
1817
1818 }
1819
1820
1821 /**
1822  * Method called whenever a peer creates a tunnel to us
1823  *
1824  * @param cls closure
1825  * @param tunnel new handle to the tunnel
1826  * @param initiator peer that started the tunnel
1827  * @param atsi performance information for the tunnel
1828  * @return initial tunnel context for the tunnel
1829  *         (can be NULL -- that's not an error)
1830  */
1831 static void *
1832 new_tunnel_notify (void *cls,
1833                    struct GNUNET_MESH_Tunnel *tunnel,
1834                    const struct GNUNET_PeerIdentity *initiator,
1835                    const struct GNUNET_ATS_Information *atsi)
1836 {
1837   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
1838   struct GNUNET_STREAM_Socket *socket;
1839
1840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1841               "Peer %s initiated tunnel to us\n", GNUNET_i2s (initiator));
1842   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1843   socket->tunnel = tunnel;
1844   socket->session_id = 0;       /* FIXME */
1845   socket->other_peer = *initiator;
1846   socket->state = STATE_INIT;
1847   socket->derived = GNUNET_YES;
1848
1849   /* FIXME: Copy MESH handle from lsocket to socket */
1850
1851   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
1852                                            socket,
1853                                            &socket->other_peer))
1854     {
1855       socket->state = STATE_CLOSED;
1856       /* FIXME: Send CLOSE message and then free */
1857       GNUNET_free (socket);
1858       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
1859     }
1860   return socket;
1861 }
1862
1863
1864 /**
1865  * Function called whenever an inbound tunnel is destroyed.  Should clean up
1866  * any associated state.  This function is NOT called if the client has
1867  * explicitly asked for the tunnel to be destroyed using
1868  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
1869  * the tunnel.
1870  *
1871  * @param cls closure (set from GNUNET_MESH_connect)
1872  * @param tunnel connection to the other end (henceforth invalid)
1873  * @param tunnel_ctx place where local state associated
1874  *                   with the tunnel is stored
1875  */
1876 static void 
1877 tunnel_cleaner (void *cls,
1878                 const struct GNUNET_MESH_Tunnel *tunnel,
1879                 void *tunnel_ctx)
1880 {
1881   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
1882   
1883   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1884               "Peer %s has terminated connection abruptly\n",
1885               GNUNET_i2s (&socket->other_peer));
1886
1887   socket->status = GNUNET_STREAM_SHUTDOWN;
1888   /* Clear Transmit handles */
1889   if (NULL != socket->transmit_handle)
1890     {
1891       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
1892       socket->transmit_handle = NULL;
1893     }
1894   socket->tunnel = NULL;
1895 }
1896
1897
1898 /*****************/
1899 /* API functions */
1900 /*****************/
1901
1902
1903 /**
1904  * Tries to open a stream to the target peer
1905  *
1906  * @param cfg configuration to use
1907  * @param target the target peer to which the stream has to be opened
1908  * @param app_port the application port number which uniquely identifies this
1909  *            stream
1910  * @param open_cb this function will be called after stream has be established 
1911  * @param open_cb_cls the closure for open_cb
1912  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
1913  * @return if successful it returns the stream socket; NULL if stream cannot be
1914  *         opened 
1915  */
1916 struct GNUNET_STREAM_Socket *
1917 GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
1918                     const struct GNUNET_PeerIdentity *target,
1919                     GNUNET_MESH_ApplicationType app_port,
1920                     GNUNET_STREAM_OpenCallback open_cb,
1921                     void *open_cb_cls,
1922                     ...)
1923 {
1924   struct GNUNET_STREAM_Socket *socket;
1925   enum GNUNET_STREAM_Option option;
1926   va_list vargs;                /* Variable arguments */
1927
1928   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1929               "%s\n", __func__);
1930
1931   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
1932   socket->other_peer = *target;
1933   socket->open_cb = open_cb;
1934   socket->open_cls = open_cb_cls;
1935
1936   /* Set defaults */
1937   socket->retransmit_timeout = 
1938     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
1939
1940   va_start (vargs, open_cb_cls); /* Parse variable args */
1941   do {
1942     option = va_arg (vargs, enum GNUNET_STREAM_Option);
1943     switch (option)
1944       {
1945       case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
1946         /* Expect struct GNUNET_TIME_Relative */
1947         socket->retransmit_timeout = va_arg (vargs,
1948                                              struct GNUNET_TIME_Relative);
1949         break;
1950       case GNUNET_STREAM_OPTION_END:
1951         break;
1952       }
1953   } while (GNUNET_STREAM_OPTION_END != option);
1954   va_end (vargs);               /* End of variable args parsing */
1955   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1956                                       10,  /* QUEUE size as parameter? */
1957                                       socket, /* cls */
1958                                       NULL, /* No inbound tunnel handler */
1959                                       &tunnel_cleaner,
1960                                       client_message_handlers,
1961                                       &app_port); /* We don't get inbound tunnels */
1962   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
1963     {
1964       GNUNET_free (socket);
1965       return NULL;
1966     }
1967
1968   /* Now create the mesh tunnel to target */
1969   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1970               "Creating MESH Tunnel\n");
1971   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
1972                                               NULL, /* Tunnel context */
1973                                               &mesh_peer_connect_callback,
1974                                               &mesh_peer_disconnect_callback,
1975                                               socket);
1976   GNUNET_assert (NULL != socket->tunnel);
1977   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
1978                                         target);
1979   
1980   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981               "%s() END\n", __func__);
1982   return socket;
1983 }
1984
1985
1986 /**
1987  * Shutdown the stream for reading or writing (man 2 shutdown).
1988  *
1989  * @param socket the stream socket
1990  * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
1991  */
1992 void
1993 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
1994                         int how)
1995 {
1996   return;
1997 }
1998
1999
2000 /**
2001  * Closes the stream
2002  *
2003  * @param socket the stream socket
2004  */
2005 void
2006 GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2007 {
2008   struct MessageQueue *head;
2009
2010   if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
2011   {
2012     /* socket closed with read task pending!? */
2013     GNUNET_break (0);
2014     GNUNET_SCHEDULER_cancel (socket->read_task);
2015     socket->read_task = GNUNET_SCHEDULER_NO_TASK;
2016   }
2017
2018   /* Clear Transmit handles */
2019   if (NULL != socket->transmit_handle)
2020     {
2021       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2022       socket->transmit_handle = NULL;
2023     }
2024
2025   /* Clear existing message queue */
2026   while (NULL != (head = socket->queue_head)) {
2027     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2028                                  socket->queue_tail,
2029                                  head);
2030     GNUNET_free (head->message);
2031     GNUNET_free (head);
2032   }
2033
2034   /* Close associated tunnel */
2035   if (NULL != socket->tunnel)
2036     {
2037       GNUNET_MESH_tunnel_destroy (socket->tunnel);
2038       socket->tunnel = NULL;
2039     }
2040
2041   /* Close mesh connection */
2042   if (NULL != socket->mesh && GNUNET_YES != socket->derived)
2043     {
2044       GNUNET_MESH_disconnect (socket->mesh);
2045       socket->mesh = NULL;
2046     }
2047   
2048   /* Release receive buffer */
2049   if (NULL != socket->receive_buffer)
2050     {
2051       GNUNET_free (socket->receive_buffer);
2052     }
2053
2054   GNUNET_free (socket);
2055 }
2056
2057
2058 /**
2059  * Listens for stream connections for a specific application ports
2060  *
2061  * @param cfg the configuration to use
2062  * @param app_port the application port for which new streams will be accepted
2063  * @param listen_cb this function will be called when a peer tries to establish
2064  *            a stream with us
2065  * @param listen_cb_cls closure for listen_cb
2066  * @return listen socket, NULL for any error
2067  */
2068 struct GNUNET_STREAM_ListenSocket *
2069 GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
2070                       GNUNET_MESH_ApplicationType app_port,
2071                       GNUNET_STREAM_ListenCallback listen_cb,
2072                       void *listen_cb_cls)
2073 {
2074   /* FIXME: Add variable args for passing configration options? */
2075   struct GNUNET_STREAM_ListenSocket *lsocket;
2076
2077   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
2078   lsocket->port = app_port;
2079   lsocket->listen_cb = listen_cb;
2080   lsocket->listen_cb_cls = listen_cb_cls;
2081   lsocket->mesh = GNUNET_MESH_connect (cfg,
2082                                        10, /* FIXME: QUEUE size as parameter? */
2083                                        lsocket, /* Closure */
2084                                        &new_tunnel_notify,
2085                                        &tunnel_cleaner,
2086                                        server_message_handlers,
2087                                        &app_port);
2088   GNUNET_assert (NULL != lsocket->mesh);
2089   return lsocket;
2090 }
2091
2092
2093 /**
2094  * Closes the listen socket
2095  *
2096  * @param lsocket the listen socket
2097  */
2098 void
2099 GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2100 {
2101   /* Close MESH connection */
2102   GNUNET_assert (NULL != lsocket->mesh);
2103   GNUNET_MESH_disconnect (lsocket->mesh);
2104   
2105   GNUNET_free (lsocket);
2106 }
2107
2108
2109 /**
2110  * Tries to write the given data to the stream
2111  *
2112  * @param socket the socket representing a stream
2113  * @param data the data buffer from where the data is written into the stream
2114  * @param size the number of bytes to be written from the data buffer
2115  * @param timeout the timeout period
2116  * @param write_cont the function to call upon writing some bytes into the stream
2117  * @param write_cont_cls the closure
2118  * @return handle to cancel the operation
2119  */
2120 struct GNUNET_STREAM_IOWriteHandle *
2121 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2122                      const void *data,
2123                      size_t size,
2124                      struct GNUNET_TIME_Relative timeout,
2125                      GNUNET_STREAM_CompletionContinuation write_cont,
2126                      void *write_cont_cls)
2127 {
2128   unsigned int num_needed_packets;
2129   unsigned int packet;
2130   struct GNUNET_STREAM_IOWriteHandle *io_handle;
2131   uint32_t packet_size;
2132   uint32_t payload_size;
2133   struct GNUNET_STREAM_DataMessage *data_msg;
2134   const void *sweep;
2135
2136   /* Return NULL if there is already a write request pending */
2137   if (NULL != socket->write_handle)
2138   {
2139     GNUNET_break (0);
2140     return NULL;
2141   }
2142   if (!((STATE_ESTABLISHED == socket->state)
2143         || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
2144         || (STATE_RECEIVE_CLOSED == socket->state)))
2145     {
2146       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2147                   "Attempting to write on a closed (OR) not-yet-established"
2148                   "stream\n"); 
2149       return NULL;
2150     } 
2151   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2152     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
2153   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2154   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2155   sweep = data;
2156   /* Divide the given buffer into packets for sending */
2157   for (packet=0; packet < num_needed_packets; packet++)
2158     {
2159       if ((packet + 1) * max_payload_size < size) 
2160         {
2161           payload_size = max_payload_size;
2162           packet_size = MAX_PACKET_SIZE;
2163         }
2164       else 
2165         {
2166           payload_size = size - packet * max_payload_size;
2167           packet_size =  payload_size + sizeof (struct
2168                                                 GNUNET_STREAM_DataMessage); 
2169         }
2170       io_handle->messages[packet] = GNUNET_malloc (packet_size);
2171       io_handle->messages[packet]->header.header.size = htons (packet_size);
2172       io_handle->messages[packet]->header.header.type =
2173         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
2174       io_handle->messages[packet]->sequence_number =
2175         htons (socket->write_sequence_number++);
2176       io_handle->messages[packet]->offset = htons (socket->write_offset);
2177
2178       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
2179          determined from RTT */
2180       io_handle->messages[packet]->ack_deadline = 
2181         GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply 
2182                                    (GNUNET_TIME_UNIT_SECONDS, 5));
2183       data_msg = io_handle->messages[packet];
2184       /* Copy data from given buffer to the packet */
2185       memcpy (&data_msg[1],
2186               sweep,
2187               payload_size);
2188       sweep += payload_size;
2189       socket->write_offset += payload_size;
2190     }
2191   socket->write_handle = io_handle;
2192   write_data (socket);
2193
2194   return io_handle;
2195 }
2196
2197
2198 /**
2199  * Tries to read data from the stream
2200  *
2201  * @param socket the socket representing a stream
2202  * @param timeout the timeout period
2203  * @param proc function to call with data (once only)
2204  * @param proc_cls the closure for proc
2205  * @return handle to cancel the operation
2206  */
2207 struct GNUNET_STREAM_IOReadHandle *
2208 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2209                     struct GNUNET_TIME_Relative timeout,
2210                     GNUNET_STREAM_DataProcessor proc,
2211                     void *proc_cls)
2212 {
2213   struct GNUNET_STREAM_IOReadHandle *read_handle;
2214   
2215   /* Return NULL if there is already a read handle; the user has to cancel that
2216   first before continuing or has to wait until it is completed */
2217   if (NULL != socket->read_handle) return NULL;
2218
2219   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2220   read_handle->proc = proc;
2221   socket->read_handle = read_handle;
2222
2223   /* Check if we have a packet at bitmap 0 */
2224   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
2225                                           0))
2226     {
2227       socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
2228                                                     socket);
2229    
2230     }
2231   
2232   /* Setup the read timeout task */
2233   socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
2234                                                                &read_io_timeout,
2235                                                                socket);
2236   return read_handle;
2237 }
2238
2239
2240 /**
2241  * Cancel pending write operation.
2242  *
2243  * @param ioh handle to operation to cancel
2244  */
2245 void
2246 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2247 {
2248   return;
2249 }
2250
2251
2252 /**
2253  * Cancel pending read operation.
2254  *
2255  * @param ioh handle to operation to cancel
2256  */
2257 void
2258 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
2259 {
2260   return;
2261 }