# Boston, MA 02111-1307, USA.
#
# Testcase for gnunet-peerinfo
-#import pexpect
-from __future__ import print_function
+import sys
import os
-#import signal
-import re
import subprocess
-import sys
+import re
import shutil
import time
-class pexpect (object):
- def __init__ (self):
- super (pexpect, self).__init__ ()
-
- def spawn (self, stdin, arglist, *pargs, **kwargs):
- self.proc = subprocess.Popen (arglist, *pargs, **kwargs)
- if self.proc is None:
- print ("Failed to spawn a process {0}".format (arglist))
- sys.exit (1)
- if stdin is not None:
- self.stdo, self.stde = self.proc.communicate (stdin)
- else:
- self.stdo, self.stde = self.proc.communicate ()
- return self.proc
-
- def expect (self, s, r, flags=0):
- stream = self.stdo if s == 'stdout' else self.stde
- if isinstance (r, str):
- if r == "EOF":
- if len (stream) == 0:
- return True
- else:
- print ("Failed to match {0} with `{1}'. {0} is `{2}'".format (s, r, stream))
- sys.exit (2)
- raise ValueError ("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format (r))
- m = r.match (stream, flags)
- if not m:
- print ("Failed to match {0} with `{1}'. {0} is `{2}'".format (s, r.pattern, stream))
- sys.exit (2)
- stream = stream[m.end ():]
- if s == 'stdout':
- self.stdo = stream
- else:
- self.stde = stream
- return m
+srcdir = "../.."
+gnunet_pyexpect_dir = os.path.join (srcdir, "contrib", "gnunet_pyexpect")
+if gnunet_pyexpect_dir not in sys.path:
+ sys.path.append (gnunet_pyexpect_dir)
- def read (self, s, size=-1):
- stream = self.stdo if s == 'stdout' else self.stde
- result = ""
- if size < 0:
- result = stream
- new_stream = ""
- else:
- result = stream[0:size]
- new_stream = stream[size:]
- if s == 'stdout':
- self.stdo = new_stream
- else:
- self.stde = new_stream
- return result
+from gnunet_pyexpect import pexpect
if os.name == 'posix':
peerinfo = 'gnunet-peerinfo'
pinfo.expect ("stdout", "EOF")
if os.name == "nt":
- shutil.rmtree (os.path.join (os.getenv ("TEMP"), "tmp", "gnunet-test-peerinfo"), True)
+ shutil.rmtree (os.path.join (os.getenv ("TEMP"), "gnunet-test-peerinfo"), True)
else:
shutil.rmtree ("/tmp/gnunet-test-peerinfo", True)
arm = subprocess.Popen ([gnunetarm, '-sq', '-c', 'test_gnunet_peerinfo_data.conf'])
try:
pinfo.spawn (None, [peerinfo, '-c', 'test_gnunet_peerinfo_data.conf', '-s'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pinfo.expect ("stdout", re.compile (r'I am peer `.*\'.\r?\n'))
- pinfo.expect ("stdout", "EOF")
pinfo.spawn (None, [peerinfo, '-c', 'test_gnunet_peerinfo_data.conf', '-qs'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pinfo.expect ("stdout", re.compile (r'.......................................................................................................\r?\n'))
- pinfo.expect ("stdout", "EOF")
pinfo.spawn (None, [peerinfo, '-c', 'test_gnunet_peerinfo_data.conf', 'invalid'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pinfo.expect ("stdout", re.compile (r'Invalid command line argument `invalid\'\r?\n'))
- pinfo.expect ("stdout", "EOF")
arm = subprocess.Popen ([gnunetarm, '-q', '-i', 'transport', '-c', 'test_gnunet_peerinfo_data.conf'])
arm.communicate ()
m = pinfo.expect ("stdout", re.compile ("\s.*:24357\r?\n"))
while len (m.group (0)) > 0:
m = pinfo.expect ("stdout", re.compile ("(\s.*:24357\r?\n|\r?\n|)"))
- pinfo.expect ("stdout", "EOF")
pinfo.spawn (None, [peerinfo, '-c', 'test_gnunet_peerinfo_data.conf', '-n'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pinfo.expect ("stdout", re.compile ("Peer `.*'\r?\n"))
m = pinfo.expect ("stdout", re.compile ("\s.*:24357\r?\n"))
while len (m.group (0)) > 0:
m = pinfo.expect ("stdout", re.compile ("(\s.*:24357\r?\n|\r?\n|)"))
- pinfo.expect ("stdout", "EOF")
pinfo.spawn (None, [peerinfo, '-c', 'test_gnunet_peerinfo_data.conf', '-qs'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pid = pinfo.read ("stdout")
arm = subprocess.Popen ([gnunetarm, '-eq', '-c', 'test_gnunet_peerinfo_data.conf'])
arm.communicate ()
if os.name == "nt":
- shutil.rmtree (os.path.join (os.getenv ("TEMP"), "tmp", "gnunet-test-peerinfo"), True)
+ shutil.rmtree (os.path.join (os.getenv ("TEMP"), "gnunet-test-peerinfo"), True)
else:
shutil.rmtree ("/tmp/gnunet-test-peerinfo", True)