9611fc0ae586bd979814b731bb74386be883744e
[oweals/gnunet.git] / contrib / scripts / gnunet_pyexpect.py.in
1 #!@PYTHON@
2 #    This file is part of GNUnet.
3 #    (C) 2010, 2018 Christian Grothoff (and other contributing authors)
4 #
5 #    GNUnet is free software: you can redistribute it and/or modify it
6 #    under the terms of the GNU Affero General Public License as published
7 #    by the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    GNUnet is distributed in the hope that it will be useful, but
11 #    WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #    Affero General Public License for more details.
14 #   
15 #    You should have received a copy of the GNU Affero General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 #    SPDX-License-Identifier: AGPL3.0-or-later
19 #
20 # Testcase for gnunet-peerinfo
21
22 from builtins import object
23 import os
24 import re
25 import subprocess
26 import sys
27 import shutil
28 import time
29
30
31 class pexpect (object):
32     def __init__(self):
33         super(pexpect, self).__init__()
34
35     def spawn(self, stdin, arglist, *pargs, **kwargs):
36         env = kwargs.pop('env', None)
37         if env is None:
38             env = os.environ.copy()
39         # This messes up some testcases, disable log redirection
40         env.pop('GNUNET_FORCE_LOGFILE', None)
41         self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
42         if self.proc is None:
43             print("Failed to spawn a process {0}".format(arglist))
44             sys.exit(1)
45         if stdin is not None:
46             self.stdo, self.stde = self.proc.communicate(stdin)
47         else:
48             self.stdo, self.stde = self.proc.communicate()
49         return self.proc
50
51     def expect(self, s, r, flags=0):
52         stream = self.stdo if s == 'stdout' else self.stde
53         if isinstance(r, str):
54             if r == "EOF":
55                 if len(stream) == 0:
56                     return True
57                 else:
58                     print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
59                     sys.exit(2)
60             raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
61         m = r.search(stream.decode(), flags)
62         if not m:
63             print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
64             sys.exit(2)
65         stream = stream[m.end():]
66         if s == 'stdout':
67             self.stdo = stream
68         else:
69             self.stde = stream
70         return m
71
72     def read(self, s, size=-1):
73         stream = self.stdo if s == 'stdout' else self.stde
74         result = ""
75         if size < 0:
76             result = stream
77             new_stream = ""
78         else:
79             result = stream[0:size]
80             new_stream = stream[size:]
81         if s == 'stdout':
82             self.stdo = new_stream
83         else:
84             self.stde = new_stream
85         return result