Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / contrib / scripts / gnunet_pyexpect.py.in
1 #!@PYTHON@
2 #    This file is part of GNUnet.
3 #    (C) 2010, 2018 Christian Grothoff (and other contributing authors)
4 #
5 #    GNUnet is free software: you can redistribute it and/or modify it
6 #    under the terms of the GNU Affero General Public License as published
7 #    by the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    GNUnet is distributed in the hope that it will be useful, but
11 #    WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #    Affero General Public License for more details.
14 #   
15 #    You should have received a copy of the GNU Affero General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Testcase for gnunet-peerinfo
19 from __future__ import print_function
20 import os
21 import re
22 import subprocess
23 import sys
24 import shutil
25 import time
26
27
28 class pexpect (object):
29     def __init__(self):
30         super(pexpect, self).__init__()
31
32     def spawn(self, stdin, arglist, *pargs, **kwargs):
33         env = kwargs.pop('env', None)
34         if env is None:
35             env = os.environ.copy()
36         # This messes up some testcases, disable log redirection
37         env.pop('GNUNET_FORCE_LOGFILE', None)
38         self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
39         if self.proc is None:
40             print("Failed to spawn a process {0}".format(arglist))
41             sys.exit(1)
42         if stdin is not None:
43             self.stdo, self.stde = self.proc.communicate(stdin)
44         else:
45             self.stdo, self.stde = self.proc.communicate()
46         return self.proc
47
48     def expect(self, s, r, flags=0):
49         stream = self.stdo if s == 'stdout' else self.stde
50         if isinstance(r, str):
51             if r == "EOF":
52                 if len(stream) == 0:
53                     return True
54                 else:
55                     print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
56                     sys.exit(2)
57             raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
58         m = r.search(stream.decode(), flags)
59         if not m:
60             print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
61             sys.exit(2)
62         stream = stream[m.end():]
63         if s == 'stdout':
64             self.stdo = stream
65         else:
66             self.stde = stream
67         return m
68
69     def read(self, s, size=-1):
70         stream = self.stdo if s == 'stdout' else self.stde
71         result = ""
72         if size < 0:
73             result = stream
74             new_stream = ""
75         else:
76             result = stream[0:size]
77             new_stream = stream[size:]
78         if s == 'stdout':
79             self.stdo = new_stream
80         else:
81             self.stde = new_stream
82         return result