X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=contrib%2Fgnunet_pyexpect.py.in;h=cfeb06d8d6df14331110a05c9b990aa0f9af128d;hb=8f62c823dfb8150c3bc970932d042be2feb83e25;hp=15d19fe5a6c002e5fbaa6747246664f4b17bae82;hpb=ab81cb16f400110163530a6eaf967224296b99aa;p=oweals%2Fgnunet.git diff --git a/contrib/gnunet_pyexpect.py.in b/contrib/gnunet_pyexpect.py.in index 15d19fe5a..cfeb06d8d 100644 --- a/contrib/gnunet_pyexpect.py.in +++ b/contrib/gnunet_pyexpect.py.in @@ -1,78 +1,83 @@ -#!@PYTHON@ -# This file is part of GNUnet. -# (C) 2010 Christian Grothoff (and other contributing authors) -# -# GNUnet is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published -# by the Free Software Foundation; either version 2, or (at your -# option) any later version. -# -# GNUnet is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNUnet; see the file COPYING. If not, write to the -# Free Software Foundation, Inc., 59 Temple Place - Suite 330, -# Boston, MA 02111-1307, USA. -# -# Testcase for gnunet-peerinfo -from __future__ import print_function -import os -import re -import subprocess -import sys -import shutil -import time - -class pexpect (object): - def __init__ (self): - super (pexpect, self).__init__ () - - def spawn (self, stdin, arglist, *pargs, **kwargs): - self.proc = subprocess.Popen (arglist, *pargs, **kwargs) - if self.proc is None: - print ("Failed to spawn a process {0}".format (arglist)) - sys.exit (1) - if stdin is not None: - self.stdo, self.stde = self.proc.communicate (stdin) - else: - self.stdo, self.stde = self.proc.communicate () - return self.proc - - def expect (self, s, r, flags=0): - stream = self.stdo if s == 'stdout' else self.stde - if isinstance (r, str): - if r == "EOF": - if len (stream) == 0: - return True - else: - print ("Failed to find `{1}' in {0}, which is `{2}' ({3})".format (s, r, stream, len (stream))) - sys.exit (2) - raise ValueError ("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format (r)) - m = r.search (stream, flags) - if not m: - print ("Failed to find `{1}' in {0}, which is is `{2}'".format (s, r.pattern, stream)) - sys.exit (2) - stream = stream[m.end ():] - if s == 'stdout': - self.stdo = stream - else: - self.stde = stream - return m - - def read (self, s, size=-1): - stream = self.stdo if s == 'stdout' else self.stde - result = "" - if size < 0: - result = stream - new_stream = "" - else: - result = stream[0:size] - new_stream = stream[size:] - if s == 'stdout': - self.stdo = new_stream - else: - self.stde = new_stream - return result +#!@PYTHON@ +# This file is part of GNUnet. +# (C) 2010 Christian Grothoff (and other contributing authors) +# +# GNUnet is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation; either version 2, or (at your +# option) any later version. +# +# GNUnet is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNUnet; see the file COPYING. If not, write to the +# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, +# Boston, MA 02110-1301, USA. +# +# Testcase for gnunet-peerinfo +from __future__ import print_function +import os +import re +import subprocess +import sys +import shutil +import time + +class pexpect (object): + def __init__ (self): + super (pexpect, self).__init__ () + + def spawn (self, stdin, arglist, *pargs, **kwargs): + env = kwargs.pop ('env', None) + if env is None: + env = os.environ.copy () + # This messes up some testcases, disable log redirection + env.pop ('GNUNET_FORCE_LOGFILE', None) + self.proc = subprocess.Popen (arglist, *pargs, env=env, **kwargs) + if self.proc is None: + print ("Failed to spawn a process {0}".format (arglist)) + sys.exit (1) + if stdin is not None: + self.stdo, self.stde = self.proc.communicate (stdin) + else: + self.stdo, self.stde = self.proc.communicate () + return self.proc + + def expect (self, s, r, flags=0): + stream = self.stdo if s == 'stdout' else self.stde + if isinstance (r, str): + if r == "EOF": + if len (stream) == 0: + return True + else: + print ("Failed to find `{1}' in {0}, which is `{2}' ({3})".format (s, r, stream, len (stream))) + sys.exit (2) + raise ValueError ("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format (r)) + m = r.search (stream.decode(), flags) + if not m: + print ("Failed to find `{1}' in {0}, which is is `{2}'".format (s, r.pattern, stream)) + sys.exit (2) + stream = stream[m.end ():] + if s == 'stdout': + self.stdo = stream + else: + self.stde = stream + return m + + def read (self, s, size=-1): + stream = self.stdo if s == 'stdout' else self.stde + result = "" + if size < 0: + result = stream + new_stream = "" + else: + result = stream[0:size] + new_stream = stream[size:] + if s == 'stdout': + self.stdo = new_stream + else: + self.stde = new_stream + return result