--- /dev/null
+#!@PYTHON@\r
+# This file is part of GNUnet.\r
+# (C) 2010 Christian Grothoff (and other contributing authors)\r
+#\r
+# GNUnet is free software; you can redistribute it and/or modify\r
+# it under the terms of the GNU General Public License as published\r
+# by the Free Software Foundation; either version 2, or (at your\r
+# option) any later version.\r
+#\r
+# GNUnet is distributed in the hope that it will be useful, but\r
+# WITHOUT ANY WARRANTY; without even the implied warranty of\r
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU\r
+# General Public License for more details.\r
+#\r
+# You should have received a copy of the GNU General Public License\r
+# along with GNUnet; see the file COPYING. If not, write to the\r
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,\r
+# Boston, MA 02111-1307, USA.\r
+#\r
+# Testcase for gnunet-peerinfo\r
+from __future__ import print_function\r
+import os\r
+import re\r
+import subprocess\r
+import sys\r
+import shutil\r
+import time\r
+\r
+class pexpect (object):\r
+ def __init__ (self):\r
+ super (pexpect, self).__init__ ()\r
+\r
+ def spawn (self, stdin, arglist, *pargs, **kwargs):\r
+ env = kwargs.pop ('env', None)\r
+ if env is None:\r
+ env = os.environ.copy ()\r
+ # This messes up some testcases, disable log redirection\r
+ env.pop ('GNUNET_FORCE_LOGFILE', None)\r
+ self.proc = subprocess.Popen (arglist, *pargs, env=env, **kwargs)\r
+ if self.proc is None:\r
+ print ("Failed to spawn a process {0}".format (arglist))\r
+ sys.exit (1)\r
+ if stdin is not None:\r
+ self.stdo, self.stde = self.proc.communicate (stdin)\r
+ else:\r
+ self.stdo, self.stde = self.proc.communicate ()\r
+ return self.proc\r
+\r
+ def expect (self, s, r, flags=0):\r
+ stream = self.stdo if s == 'stdout' else self.stde\r
+ if isinstance (r, str):\r
+ if r == "EOF":\r
+ if len (stream) == 0:\r
+ return True\r
+ else:\r
+ print ("Failed to find `{1}' in {0}, which is `{2}' ({3})".format (s, r, stream, len (stream)))\r
+ sys.exit (2)\r
+ raise ValueError ("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format (r))\r
+ m = r.search (stream, flags)\r
+ if not m:\r
+ print ("Failed to find `{1}' in {0}, which is is `{2}'".format (s, r.pattern, stream))\r
+ sys.exit (2)\r
+ stream = stream[m.end ():]\r
+ if s == 'stdout':\r
+ self.stdo = stream\r
+ else:\r
+ self.stde = stream\r
+ return m\r
+\r
+ def read (self, s, size=-1):\r
+ stream = self.stdo if s == 'stdout' else self.stde\r
+ result = ""\r
+ if size < 0:\r
+ result = stream\r
+ new_stream = ""\r
+ else:\r
+ result = stream[0:size]\r
+ new_stream = stream[size:]\r
+ if s == 'stdout':\r
+ self.stdo = new_stream\r
+ else:\r
+ self.stde = new_stream\r
+ return result\r
-#!@PYTHON@
+#!/usr/bin/python
# This file is part of GNUnet.
# (C) 2010 Christian Grothoff (and other contributing authors)
#
#
# Functions for integration testing
import os
-import re
import subprocess
import sys
import shutil
import time
-
+from gnunet_pyexpect import pexpect
class Check:
def __init__(self, test):
neg_cont (self)
else:
pos_cont (self)
- def eval(self, failed_only):
+ def evaluate (self, failed_only):
pos = 0
neg = 0
for c in self.conditions:
- if (False == c.eval (failed_only)):
+ if (False == c.evaluate (failed_only)):
neg += 1
else:
pos += 1
self.type = type
def check(self):
return False;
- def eval(self, failed_only):
+ def evaluate (self, failed_only):
if ((self.fulfilled == False) and (failed_only == True)):
print str(self.type) + 'condition for was ' + str(self.fulfilled)
elif (failed_only == False):
return False
else:
return True
- def eval(self, failed_only):
+ def evaluate (self, failed_only):
if ((self.fulfilled == False) and (failed_only == True)):
print str(self.type) + 'condition for file '+self.file+' was ' + str(self.fulfilled)
elif (failed_only == False):
return False
else:
return True
- def eval(self, failed_only):
- if (self.result == -1):
- res = 'NaN'
- else:
- res = str(self.result)
+ def evaluate (self, failed_only):
+ if (self.result == -1):
+ res = 'NaN'
+ else:
+ res = str(self.result)
if (self.fulfilled == False):
fail = " FAIL!"
op = " != "
class Test:
def __init__(self, testname, verbose):
self.peers = list()
- self.verbose = verbose;
- self.name = testname;
- srcdir = "../.."
+ self.verbose = verbose;
+ self.name = testname;
+ srcdir = "../.."
gnunet_pyexpect_dir = os.path.join (srcdir, "contrib")
if gnunet_pyexpect_dir not in sys.path:
- sys.path.append (gnunet_pyexpect_dir)
- from gnunet_pyexpect import pexpect
+ sys.path.append (gnunet_pyexpect_dir)
self.gnunetarm = ''
self.gnunetstatistics = ''
if os.name == 'posix':
- self.gnunetarm = 'gnunet-arm'
- self.gnunetstatistics = 'gnunet-statistics'
+ self.gnunetarm = 'gnunet-arm'
+ self.gnunetstatistics = 'gnunet-statistics'
+ self.gnunetpeerinfo = 'gnunet-peerinfo'
elif os.name == 'nt':
- self.gnunetarm = 'gnunet-arm.exe'
- self.gnunetstatistics = 'gnunet-statistics.exe'
+ self.gnunetarm = 'gnunet-arm.exe'
+ self.gnunetstatistics = 'gnunet-statistics.exe'
+ self.gnunetpeerinfo = 'gnunet-peerinfo.exe'
if os.name == "nt":
shutil.rmtree (os.path.join (os.getenv ("TEMP"), testname), True)
else:
shutil.rmtree ("/tmp/" + testname, True)
- def add_peer (peer):
- self.conditions.append(condition)
+ def add_peer (self, peer):
+ self.peers.append(peer)
def p (self, msg):
if (self.verbose == True):
- print msg
+ print msg
class Peer:
def __init__(self, test, cfg_file):
if (False == os.path.isfile(cfg_file)):
print ("Peer cfg " + cfg_file + ": FILE NOT FOUND")
+ self.id = ""
self.test = test
self.started = False
self.cfg = cfg_file
def __del__(self):
- if (self.started == True):
- print 'ERROR! Peer using cfg ' + self.cfg + ' was not stopped'
- if (ret == self.stop ()):
- print 'ERROR! Peer using cfg ' + self.cfg + ' could not be stopped'
- self.started == False
- return ret
- else:
- return False
+ if (self.started == True):
+ print 'ERROR! Peer using cfg ' + self.cfg + ' was not stopped'
+ ret = self.stop ()
+ if (False == ret):
+ print 'ERROR! Peer using cfg ' + self.cfg + ' could not be stopped'
+ self.started = False
+ return ret
+ else:
+ return False
def start (self):
self.test.p ("Starting peer using cfg " + self.cfg)
try:
print "Can not start peer"
self.started = False
return False
- self.started = True
+ self.started = True;
+ test = ''
+ try:
+ server = pexpect ()
+ server.spawn (None, [self.test.gnunetpeerinfo, '-c', self.cfg ,'-s'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ test = server.read("stdout", 1024)
+ except OSError:
+ print "Can not get peer identity"
+ test = (test.split('`')[1])
+ self.id = test.split('\'')[0]
return True
def stop (self):
- if (self.started == False):
- return False
+ if (self.started == False):
+ return False
self.test.p ("Stopping peer using cfg " + self.cfg)
try:
server = subprocess.Popen ([self.test.gnunetarm, '-eq', '-c', self.cfg])
self.started = False
return True;
def get_statistics_value (self, subsystem, name):
- from gnunet_pyexpect import pexpect
server = pexpect ()
server.spawn (None, [self.test.gnunetstatistics, '-c', self.cfg ,'-q','-n', name, '-s', subsystem ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
#server.expect ("stdout", re.compile (r""))