-modified testcase according to modified interface
authorSree Harsha Totakura <totakura@in.tum.de>
Sat, 3 Mar 2012 14:57:24 +0000 (14:57 +0000)
committerSree Harsha Totakura <totakura@in.tum.de>
Sat, 3 Mar 2012 14:57:24 +0000 (14:57 +0000)
src/stream/Makefile.am
src/stream/stream_api.c
src/stream/test_stream_local.c
src/stream/test_stream_local.conf

index 385c0cf4c2e101b9514e8d7970d25801e0b04bd4..c2d2ac7f09c43333e6d4140576dec97c0ca88e98 100644 (file)
@@ -33,7 +33,8 @@ test_stream_local_SOURCES = \
  test_stream_local.c
 test_stream_local_LDADD = \
  $(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/util/libgnunetutil.la  
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la 
 
 #test_stream_halfclose_SOURCES = \
 # test_stream_halfclose.c
index 84fcdfd6b56f4efa87481fb754c560ab52b88d92..58f180c4d7dd41f029809e43e9630bfc1092ab7b 100644 (file)
   Boston, MA 02111-1307, USA.
 */
 
+/* TODO:
+ * Checks for matching the sender and socket->other_peer in server
+ * message handlers  */
+
 /**
  * @file stream/stream_api.c
  * @brief Implementation of the stream library
@@ -256,6 +260,11 @@ struct GNUNET_STREAM_Socket
    */
   unsigned int retries;
 
+  /**
+   * The application port number (type: uint32_t)
+   */
+  GNUNET_MESH_ApplicationType app_port;
+
   /**
    * The session id associated with this stream connection
    * FIXME: Not used currently, may be removed
@@ -328,6 +337,7 @@ struct GNUNET_STREAM_ListenSocket
 
   /**
    * The service port
+   * FIXME: Remove if not required!
    */
   GNUNET_MESH_ApplicationType port;
 };
@@ -1314,6 +1324,12 @@ server_handle_hello (void *cls,
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
   GNUNET_assert (socket->tunnel == tunnel);
+
+  /* Catch possible protocol breaks */
+  GNUNET_break_op (0 != memcmp (&socket->other_peer, 
+                                sender,
+                                sizeof (struct GNUNET_PeerIdentity)));
+
   if (STATE_INIT == socket->state)
     {
       /* Get the random sequence number */
@@ -1791,6 +1807,78 @@ mesh_peer_disconnect_callback (void *cls,
 }
 
 
+/**
+ * Method called whenever a peer creates a tunnel to us
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ *         (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel_notify (void *cls,
+                   struct GNUNET_MESH_Tunnel *tunnel,
+                   const struct GNUNET_PeerIdentity *initiator,
+                   const struct GNUNET_ATS_Information *atsi)
+{
+  struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+  struct GNUNET_STREAM_Socket *socket;
+
+  socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+  socket->tunnel = tunnel;
+  socket->session_id = 0;       /* FIXME */
+  socket->other_peer = *initiator;
+  socket->state = STATE_INIT;
+
+  if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
+                                           socket,
+                                           &socket->other_peer))
+    {
+      socket->state = STATE_CLOSED;
+      /* FIXME: Send CLOSE message and then free */
+      GNUNET_free (socket);
+      GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
+    }
+  return socket;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed.  Should clean up
+ * any associated state.  This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ *                   with the tunnel is stored
+ */
+static void 
+tunnel_cleaner (void *cls,
+                const struct GNUNET_MESH_Tunnel *tunnel,
+                void *tunnel_ctx)
+{
+  struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+  
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer %s has terminated connection abruptly\n",
+              GNUNET_i2s (&socket->other_peer));
+
+  socket->status = GNUNET_STREAM_SHUTDOWN;
+  /* Clear Transmit handles */
+  if (NULL != socket->transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+      socket->transmit_handle = NULL;
+    }
+  socket->tunnel = NULL;
+}
+
+
 /*****************/
 /* API functions */
 /*****************/
@@ -1820,6 +1908,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   struct GNUNET_STREAM_Socket *socket;
   enum GNUNET_STREAM_Option option;
   va_list vargs;                /* Variable arguments */
+  GNUNET_MESH_ApplicationType no_port;
 
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *target;
@@ -1845,15 +1934,19 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
       }
   } while (GNUNET_STREAM_OPTION_END != option);
   va_end (vargs);               /* End of variable args parsing */
-
+  no_port = 0;
   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
                                       1,  /* QUEUE size as parameter? */
                                       socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
                                       NULL, /* No inbound tunnel cleaner */
                                       client_message_handlers,
-                                      NULL); /* We don't get inbound tunnels */
-  // FIXME: if (NULL == socket->mesh) ...
+                                      &no_port); /* We don't get inbound tunnels */
+  if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
+    {
+      GNUNET_free (socket);
+      return NULL;
+    }
 
   /* Now create the mesh tunnel to target */
   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
@@ -1867,6 +1960,20 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
 }
 
 
+/**
+ * Shutdown the stream for reading or writing (man 2 shutdown).
+ *
+ * @param socket the stream socket
+ * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
+ */
+void
+GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
+                       int how)
+{
+  return;
+}
+
+
 /**
  * Closes the stream
  *
@@ -1925,78 +2032,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
 }
 
 
-/**
- * Method called whenever a peer creates a tunnel to us
- *
- * @param cls closure
- * @param tunnel new handle to the tunnel
- * @param initiator peer that started the tunnel
- * @param atsi performance information for the tunnel
- * @return initial tunnel context for the tunnel
- *         (can be NULL -- that's not an error)
- */
-static void *
-new_tunnel_notify (void *cls,
-                   struct GNUNET_MESH_Tunnel *tunnel,
-                   const struct GNUNET_PeerIdentity *initiator,
-                   const struct GNUNET_ATS_Information *atsi)
-{
-  struct GNUNET_STREAM_ListenSocket *lsocket = cls;
-  struct GNUNET_STREAM_Socket *socket;
-
-  socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
-  socket->tunnel = tunnel;
-  socket->session_id = 0;       /* FIXME */
-  socket->other_peer = *initiator;
-  socket->state = STATE_INIT;
-
-  if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
-                                           socket,
-                                           &socket->other_peer))
-    {
-      socket->state = STATE_CLOSED;
-      /* FIXME: Send CLOSE message and then free */
-      GNUNET_free (socket);
-      GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
-    }
-  return socket;
-}
-
-
-/**
- * Function called whenever an inbound tunnel is destroyed.  Should clean up
- * any associated state.  This function is NOT called if the client has
- * explicitly asked for the tunnel to be destroyed using
- * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
- * the tunnel.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end (henceforth invalid)
- * @param tunnel_ctx place where local state associated
- *                   with the tunnel is stored
- */
-static void 
-tunnel_cleaner (void *cls,
-                const struct GNUNET_MESH_Tunnel *tunnel,
-                void *tunnel_ctx)
-{
-  struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-  
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer %s has terminated connection abruptly\n",
-              GNUNET_i2s (&socket->other_peer));
-
-  socket->status = GNUNET_STREAM_SHUTDOWN;
-  /* Clear Transmit handles */
-  if (NULL != socket->transmit_handle)
-    {
-      GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
-      socket->transmit_handle = NULL;
-    }
-  socket->tunnel = NULL;
-}
-
-
 /**
  * Listens for stream connections for a specific application ports
  *
@@ -2178,3 +2213,27 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
                                                                socket);
   return read_handle;
 }
+
+
+/**
+ * Cancel pending write operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
+{
+  return;
+}
+
+
+/**
+ * Cancel pending read operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
+{
+  return;
+}
index 4da1258fc1b4759b39f1adb536d4fd562f87b9ee..2eeedeec5e25a21b777d7103cf53e4e71aec1ad7 100644 (file)
@@ -30,6 +30,7 @@
 #include "gnunet_util_lib.h"
 #include "gnunet_mesh_service.h"
 #include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
 
 #define VERBOSE 1
 
@@ -44,9 +45,14 @@ struct PeerData
   struct GNUNET_STREAM_Socket *socket;
 
   /**
-   * Peer's io handle
+   * Peer's io write handle
    */
-  struct GNUNET_STREAM_IOHandle *io_handle;
+  struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+
+  /**
+   * Peer's io read handle
+   */
+  struct GNUNET_STREAM_IOReadHandle *io_read_handle;
 
   /**
    * Bytes the peer has written
@@ -63,6 +69,7 @@ static struct GNUNET_OS_Process *arm_pid;
 static struct PeerData peer1;
 static struct PeerData peer2;
 static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static struct GNUNET_CONFIGURATION_Handle *config;
 
 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
 static GNUNET_SCHEDULER_TaskIdentifier test_task;
@@ -78,7 +85,8 @@ static void
 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   GNUNET_STREAM_close (peer1.socket);
-  GNUNET_STREAM_close (peer2.socket);
+  if (NULL != peer2.socket)
+    GNUNET_STREAM_close (peer2.socket);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
   if (0 != abort_task)
   {
@@ -90,6 +98,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+  /* Free the duplicated configuration */
+  GNUNET_CONFIGURATION_destroy (config);
   GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
   GNUNET_OS_process_close (arm_pid);
 }
@@ -115,6 +125,22 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   do_shutdown (cls, tc);
 }
 
+/**
+ * Signature for input processor 
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read 
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ *         given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+                 enum GNUNET_STREAM_Status status,
+                 const void *input_data,
+                 size_t size);
+
 
 /**
  * The write completion function; called upon writing some data to stream or
@@ -138,26 +164,28 @@ write_completion (void *cls,
 
   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
     {
-      peer->io_handle = GNUNET_STREAM_write (peer->socket,
-                                             (void *) data,
-                                             strlen(data) - peer->bytes_wrote,
-                                             GNUNET_TIME_relative_multiply
-                                             (GNUNET_TIME_UNIT_SECONDS, 5),
-                                             &write_completion,
-                                             cls);
-      GNUNET_assert (NULL != peer->io_handle);
+      peer->io_write_handle =
+        GNUNET_STREAM_write (peer->socket,
+                             (void *) data,
+                             strlen(data) - peer->bytes_wrote,
+                             GNUNET_TIME_relative_multiply
+                             (GNUNET_TIME_UNIT_SECONDS, 5),
+                             &write_completion,
+                             cls);
+      GNUNET_assert (NULL != peer->io_write_handle);
     }
   else
     {
       if (&peer1 == peer)   /* Peer1 has finished writing; should read now */
         {
-          peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
-                                                peer->socket,
-                                                GNUNET_TIME_relative_multiply
-                                                (GNUNET_TIME_UNIT_SECONDS, 5),
-                                                &input_processor,
-                                                cls);
-          GNUNET_assert (NULL!=peer->io_handle);
+          peer->io_read_handle =
+            GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+                                peer->socket,
+                                GNUNET_TIME_relative_multiply
+                                (GNUNET_TIME_UNIT_SECONDS, 5),
+                                &input_processor,
+                                cls);
+          GNUNET_assert (NULL!=peer->io_read_handle);
         }
     }
 }
@@ -175,18 +203,19 @@ stream_open_cb (void *cls,
 {
   struct PeerData *peer;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
   peer = (struct PeerData *) cls;
   peer->bytes_wrote = 0;
   GNUNET_assert (socket == peer1.socket);
   GNUNET_assert (socket == peer->socket);
-  peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */
-                                         (void *) data, /* data */
-                                         strlen(data),
-                                         GNUNET_TIME_relative_multiply
-                                         (GNUNET_TIME_UNIT_SECONDS, 5),
-                                         &write_completion,
+  peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */
+                                               (void *) data, /* data */
+                                               strlen(data),
+                                               GNUNET_TIME_relative_multiply
+                                               (GNUNET_TIME_UNIT_SECONDS, 5),
+                                               &write_completion,
                                          cls);
-  GNUNET_assert (NULL != peer->io_handle);
+  GNUNET_assert (NULL != peer->io_write_handle);
 }
 
 
@@ -210,7 +239,7 @@ input_processor (void *cls,
 
   peer = (struct PeerData *) cls;
 
-  GNUNET_assert (GNUNET_STERAM_OK == status);
+  GNUNET_assert (GNUNET_STREAM_OK == status);
   GNUNET_assert (size < strlen (data));
   GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, 
                           (const char *) input_data,
@@ -219,27 +248,28 @@ input_processor (void *cls,
   
   if (peer->bytes_read < strlen (data))
     {
-      peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
-                                            peer->socket,
-                                            GNUNET_TIME_relative_multiply
-                                            (GNUNET_TIME_UNIT_SECONDS, 5),
-                                            &input_processor,
-                                            cls);
-      GNUNET_assert (NULL != peer->io_handle);
+      peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+                                                 peer->socket,
+                                                 GNUNET_TIME_relative_multiply
+                                                 (GNUNET_TIME_UNIT_SECONDS, 5),
+                                                 &input_processor,
+                                                 cls);
+      GNUNET_assert (NULL != peer->io_read_handle);
     }
   else 
     {
       if (&peer2 == peer)    /* Peer2 has completed reading; should write */
         {
           peer->bytes_wrote = 0;
-          peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
-                                                 peer->socket,
-                                                 (void *) data,
-                                                 strlen(data),
-                                                 GNUNET_TIME_relative_multiply
-                                                 (GNUNET_TIME_UNIT_SECONDS, 5),
-                                                 &write_completion,
-                                                 cls);
+          peer->io_write_handle = 
+            GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
+                                 peer->socket,
+                                 (void *) data,
+                                 strlen(data),
+                                 GNUNET_TIME_relative_multiply
+                                 (GNUNET_TIME_UNIT_SECONDS, 5),
+                                 &write_completion,
+                                 cls);
         }
       else                      /* Peer1 has completed reading. End of tests */
         {
@@ -261,12 +291,13 @@ stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   GNUNET_assert (NULL != cls);
   peer2.bytes_read = 0;
   GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
-  peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
-                                        GNUNET_TIME_relative_multiply
-                                        (GNUNET_TIME_UNIT_SECONDS, 5),
-                                        &input_processor,
-                                        (void *) &peer2);
-  GNUNET_assert (NULL != peer2.io_handle);
+  peer2.io_read_handle =
+    GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
+                        GNUNET_TIME_relative_multiply
+                        (GNUNET_TIME_UNIT_SECONDS, 5),
+                        &input_processor,
+                        (void *) &peer2);
+  GNUNET_assert (NULL != peer2.io_read_handle);
 }
 
 
@@ -289,6 +320,9 @@ stream_listen_cb (void *cls,
   GNUNET_assert (NULL == initiator);   /* Local peer=NULL? */
   GNUNET_assert (socket != peer1.socket);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer connected: %s\n", GNUNET_i2s(initiator));
+
   peer2.socket = socket;
   read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
   return GNUNET_OK;
@@ -297,23 +331,33 @@ stream_listen_cb (void *cls,
 
 /**
  * Testing function
+ *
+ * @param cls NULL
+ * @param tc the task context
  */
 static void
 test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct GNUNET_PeerIdentity self;
+
   test_task = GNUNET_SCHEDULER_NO_TASK;
+  /* Get our identity */
+  GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+                                                                &self));
+
+  peer2_listen_socket = GNUNET_STREAM_listen (config,
+                                              10, /* App port */
+                                              &stream_listen_cb,
+                                              NULL);
+  GNUNET_assert (NULL != peer2_listen_socket);
 
   /* Connect to stream library */
-  peer1.socket = GNUNET_STREAM_open (NULL,         /* Null for local peer? */
+  peer1.socket = GNUNET_STREAM_open (config,
+                                     &self,         /* Null for local peer? */
                                      10,           /* App port */
                                      &stream_open_cb,
                                      (void *) &peer1);
-  GNUNET_assert (NULL != peer1.socket);
-  peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */
-                                              &stream_listen_cb,
-                                              NULL);
-  GNUNET_assert (NULL != peer2_listen_socket);
-                  
+  GNUNET_assert (NULL != peer1.socket);                  
 }
 
 /**
@@ -330,6 +374,8 @@ run (void *cls, char *const *args, const char *cfgfile,
                     "WARNING",
 #endif
                     NULL);
+   /* Duplicate the configuration */
+   config = GNUNET_CONFIGURATION_dup (cfg);
    arm_pid =
      GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
                               "gnunet-service-arm",
@@ -342,8 +388,8 @@ run (void *cls, char *const *args, const char *cfgfile,
      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                    (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort,
                                     NULL);
-
-   test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg);
+   
+   test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
 
 }
 
@@ -355,7 +401,7 @@ int main (int argc, char **argv)
   int ret;
 
   char *const argv2[] = { "test-stream-local",
-                          "-c", "test_stream.conf",
+                          "-c", "test_stream_local.conf",
 #if VERBOSE
                           "-L", "DEBUG",
 #endif
index 3d955f09edc38d21da0792aa5fb007ad2dfc3edb..3f229c30257dbfef9c664af4396f0b4710fc1cb4 100644 (file)
@@ -59,8 +59,8 @@ WEAKRANDOM = YES
 HOSTKEY = $SERVICEHOME/.hostkey
 
 [PATHS]
-DEFAULTCONFIG = test_mesh.conf
-SERVICEHOME = /tmp/test-mesh/
+DEFAULTCONFIG = test_stream_local.conf
+SERVICEHOME = /tmp/test-stream/
 
 [dns]
 AUTOSTART = NO