#!@PYTHON@
# This file is part of GNUnet.
-# (C) 2010 Christian Grothoff (and other contributing authors)
+# (C) 2010, 2018 Christian Grothoff (and other contributing authors)
#
# GNUnet is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
import shutil
import time
+
class pexpect (object):
- def __init__ (self):
- super (pexpect, self).__init__ ()
+ def __init__(self):
+ super(pexpect, self).__init__()
- def spawn (self, stdin, arglist, *pargs, **kwargs):
- env = kwargs.pop ('env', None)
- if env is None:
- env = os.environ.copy ()
- # This messes up some testcases, disable log redirection
- env.pop ('GNUNET_FORCE_LOGFILE', None)
- self.proc = subprocess.Popen (arglist, *pargs, env=env, **kwargs)
- if self.proc is None:
- print ("Failed to spawn a process {0}".format (arglist))
- sys.exit (1)
- if stdin is not None:
- self.stdo, self.stde = self.proc.communicate (stdin)
- else:
- self.stdo, self.stde = self.proc.communicate ()
- return self.proc
+ def spawn(self, stdin, arglist, *pargs, **kwargs):
+ env = kwargs.pop('env', None)
+ if env is None:
+ env = os.environ.copy()
+ # This messes up some testcases, disable log redirection
+ env.pop('GNUNET_FORCE_LOGFILE', None)
+ self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
+ if self.proc is None:
+ print("Failed to spawn a process {0}".format(arglist))
+ sys.exit(1)
+ if stdin is not None:
+ self.stdo, self.stde = self.proc.communicate(stdin)
+ else:
+ self.stdo, self.stde = self.proc.communicate()
+ return self.proc
- def expect (self, s, r, flags=0):
- stream = self.stdo if s == 'stdout' else self.stde
- if isinstance (r, str):
- if r == "EOF":
- if len (stream) == 0:
- return True
+ def expect(self, s, r, flags=0):
+ stream = self.stdo if s == 'stdout' else self.stde
+ if isinstance(r, str):
+ if r == "EOF":
+ if len(stream) == 0:
+ return True
+ else:
+ print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
+ sys.exit(2)
+ raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
+ m = r.search(stream.decode(), flags)
+ if not m:
+ print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
+ sys.exit(2)
+ stream = stream[m.end():]
+ if s == 'stdout':
+ self.stdo = stream
else:
- print ("Failed to find `{1}' in {0}, which is `{2}' ({3})".format (s, r, stream, len (stream)))
- sys.exit (2)
- raise ValueError ("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format (r))
- m = r.search (stream.decode(), flags)
- if not m:
- print ("Failed to find `{1}' in {0}, which is is `{2}'".format (s, r.pattern, stream))
- sys.exit (2)
- stream = stream[m.end ():]
- if s == 'stdout':
- self.stdo = stream
- else:
- self.stde = stream
- return m
+ self.stde = stream
+ return m
- def read (self, s, size=-1):
- stream = self.stdo if s == 'stdout' else self.stde
- result = ""
- if size < 0:
- result = stream
- new_stream = ""
- else:
- result = stream[0:size]
- new_stream = stream[size:]
- if s == 'stdout':
- self.stdo = new_stream
- else:
- self.stde = new_stream
- return result
+ def read(self, s, size=-1):
+ stream = self.stdo if s == 'stdout' else self.stde
+ result = ""
+ if size < 0:
+ result = stream
+ new_stream = ""
+ else:
+ result = stream[0:size]
+ new_stream = stream[size:]
+ if s == 'stdout':
+ self.stdo = new_stream
+ else:
+ self.stde = new_stream
+ return result