Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / contrib / scripts / gnunet_pyexpect.py.in
1 #!@PYTHON@
2 #    This file is part of GNUnet.
3 #    (C) 2010, 2018 Christian Grothoff (and other contributing authors)
4 #
5 #    GNUnet is free software; you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published
7 #    by the Free Software Foundation; either version 2, or (at your
8 #    option) any later version.
9 #
10 #    GNUnet is distributed in the hope that it will be useful, but
11 #    WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 #    General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with GNUnet; see the file COPYING.  If not, write to the
17 #    Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 #    Boston, MA 02110-1301, USA.
19 #
20 # Testcase for gnunet-peerinfo
21 from __future__ import print_function
22 import os
23 import re
24 import subprocess
25 import sys
26 import shutil
27 import time
28
29
30 class pexpect (object):
31     def __init__(self):
32         super(pexpect, self).__init__()
33
34     def spawn(self, stdin, arglist, *pargs, **kwargs):
35         env = kwargs.pop('env', None)
36         if env is None:
37             env = os.environ.copy()
38         # This messes up some testcases, disable log redirection
39         env.pop('GNUNET_FORCE_LOGFILE', None)
40         self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
41         if self.proc is None:
42             print("Failed to spawn a process {0}".format(arglist))
43             sys.exit(1)
44         if stdin is not None:
45             self.stdo, self.stde = self.proc.communicate(stdin)
46         else:
47             self.stdo, self.stde = self.proc.communicate()
48         return self.proc
49
50     def expect(self, s, r, flags=0):
51         stream = self.stdo if s == 'stdout' else self.stde
52         if isinstance(r, str):
53             if r == "EOF":
54                 if len(stream) == 0:
55                     return True
56                 else:
57                     print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
58                     sys.exit(2)
59             raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
60         m = r.search(stream.decode(), flags)
61         if not m:
62             print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
63             sys.exit(2)
64         stream = stream[m.end():]
65         if s == 'stdout':
66             self.stdo = stream
67         else:
68             self.stde = stream
69         return m
70
71     def read(self, s, size=-1):
72         stream = self.stdo if s == 'stdout' else self.stde
73         result = ""
74         if size < 0:
75             result = stream
76             new_stream = ""
77         else:
78             result = stream[0:size]
79             new_stream = stream[size:]
80         if s == 'stdout':
81             self.stdo = new_stream
82         else:
83             self.stde = new_stream
84         return result