2 # This file is part of GNUnet.
3 # (C) 2010, 2018 Christian Grothoff (and other contributing authors)
5 # GNUnet is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Affero General Public License as published
7 # by the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # GNUnet is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Affero General Public License for more details.
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # SPDX-License-Identifier: AGPL3.0-or-later
20 # Testcase for gnunet-peerinfo
21 from __future__ import print_function
30 class pexpect (object):
32 super(pexpect, self).__init__()
34 def spawn(self, stdin, arglist, *pargs, **kwargs):
35 env = kwargs.pop('env', None)
37 env = os.environ.copy()
38 # This messes up some testcases, disable log redirection
39 env.pop('GNUNET_FORCE_LOGFILE', None)
40 self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
42 print("Failed to spawn a process {0}".format(arglist))
45 self.stdo, self.stde = self.proc.communicate(stdin)
47 self.stdo, self.stde = self.proc.communicate()
50 def expect(self, s, r, flags=0):
51 stream = self.stdo if s == 'stdout' else self.stde
52 if isinstance(r, str):
57 print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
59 raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
60 m = r.search(stream.decode(), flags)
62 print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
64 stream = stream[m.end():]
71 def read(self, s, size=-1):
72 stream = self.stdo if s == 'stdout' else self.stde
78 result = stream[0:size]
79 new_stream = stream[size:]
81 self.stdo = new_stream
83 self.stde = new_stream