2 # This file is part of GNUnet.
\r
3 # (C) 2010 Christian Grothoff (and other contributing authors)
\r
5 # GNUnet is free software; you can redistribute it and/or modify
\r
6 # it under the terms of the GNU General Public License as published
\r
7 # by the Free Software Foundation; either version 2, or (at your
\r
8 # option) any later version.
\r
10 # GNUnet is distributed in the hope that it will be useful, but
\r
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
\r
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
\r
13 # General Public License for more details.
\r
15 # You should have received a copy of the GNU General Public License
\r
16 # along with GNUnet; see the file COPYING. If not, write to the
\r
17 # Free Software Foundation, Inc., 59 Temple Place - Suite 330,
\r
18 # Boston, MA 02111-1307, USA.
\r
20 # Testcase for gnunet-peerinfo
\r
21 from __future__ import print_function
\r
29 class pexpect (object):
\r
30 def __init__ (self):
\r
31 super (pexpect, self).__init__ ()
\r
33 def spawn (self, stdin, arglist, *pargs, **kwargs):
\r
34 env = kwargs.pop ('env', None)
\r
36 env = os.environ.copy ()
\r
37 # This messes up some testcases, disable log redirection
\r
38 env.pop ('GNUNET_FORCE_LOGFILE', None)
\r
39 self.proc = subprocess.Popen (arglist, *pargs, env=env, **kwargs)
\r
40 if self.proc is None:
\r
41 print ("Failed to spawn a process {0}".format (arglist))
\r
43 if stdin is not None:
\r
44 self.stdo, self.stde = self.proc.communicate (stdin)
\r
46 self.stdo, self.stde = self.proc.communicate ()
\r
49 def expect (self, s, r, flags=0):
\r
50 stream = self.stdo if s == 'stdout' else self.stde
\r
51 if isinstance (r, str):
\r
53 if len (stream) == 0:
\r
56 print ("Failed to find `{1}' in {0}, which is `{2}' ({3})".format (s, r, stream, len (stream)))
\r
58 raise ValueError ("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format (r))
\r
59 m = r.search (stream, flags)
\r
61 print ("Failed to find `{1}' in {0}, which is is `{2}'".format (s, r.pattern, stream))
\r
63 stream = stream[m.end ():]
\r
70 def read (self, s, size=-1):
\r
71 stream = self.stdo if s == 'stdout' else self.stde
\r
77 result = stream[0:size]
\r
78 new_stream = stream[size:]
\r
80 self.stdo = new_stream
\r
82 self.stde = new_stream
\r