configure ATS ports conflict-free for tests
[oweals/gnunet.git] / contrib / gnunet_pyexpect.py.in
1 #!@PYTHON@\r
2 #    This file is part of GNUnet.\r
3 #    (C) 2010 Christian Grothoff (and other contributing authors)\r
4 #\r
5 #    GNUnet is free software; you can redistribute it and/or modify\r
6 #    it under the terms of the GNU General Public License as published\r
7 #    by the Free Software Foundation; either version 2, or (at your\r
8 #    option) any later version.\r
9 #\r
10 #    GNUnet is distributed in the hope that it will be useful, but\r
11 #    WITHOUT ANY WARRANTY; without even the implied warranty of\r
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU\r
13 #    General Public License for more details.\r
14 #\r
15 #    You should have received a copy of the GNU General Public License\r
16 #    along with GNUnet; see the file COPYING.  If not, write to the\r
17 #    Free Software Foundation, Inc., 59 Temple Place - Suite 330,\r
18 #    Boston, MA 02111-1307, USA.\r
19 #\r
20 # Testcase for gnunet-peerinfo\r
21 from __future__ import print_function\r
22 import os\r
23 import re\r
24 import subprocess\r
25 import sys\r
26 import shutil\r
27 import time\r
28 \r
29 class pexpect (object):\r
30   def __init__ (self):\r
31     super (pexpect, self).__init__ ()\r
32 \r
33   def spawn (self, stdin, arglist, *pargs, **kwargs):\r
34     self.proc = subprocess.Popen (arglist, *pargs, **kwargs)\r
35     if self.proc is None:\r
36       print ("Failed to spawn a process {0}".format (arglist))\r
37       sys.exit (1)\r
38     if stdin is not None:\r
39       self.stdo, self.stde = self.proc.communicate (stdin)\r
40     else:\r
41       self.stdo, self.stde = self.proc.communicate ()\r
42     return self.proc\r
43 \r
44   def expect (self, s, r, flags=0):\r
45     stream = self.stdo if s == 'stdout' else self.stde\r
46     if isinstance (r, str):\r
47       if r == "EOF":\r
48         if len (stream) == 0:\r
49           return True\r
50         else:\r
51           print ("Failed to find `{1}' in {0}, which is `{2}' ({3})".format (s, r, stream, len (stream)))\r
52           sys.exit (2)\r
53       raise ValueError ("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format (r))\r
54     m = r.search (stream, flags)\r
55     if not m:\r
56       print ("Failed to find `{1}' in {0}, which is is `{2}'".format (s, r.pattern, stream))\r
57       sys.exit (2)\r
58     stream = stream[m.end ():]\r
59     if s == 'stdout':\r
60       self.stdo = stream\r
61     else:\r
62       self.stde = stream\r
63     return m\r
64 \r
65   def read (self, s, size=-1):\r
66     stream = self.stdo if s == 'stdout' else self.stde\r
67     result = ""\r
68     if size < 0:\r
69       result = stream\r
70       new_stream = ""\r
71     else:\r
72       result = stream[0:size]\r
73       new_stream = stream[size:]\r
74     if s == 'stdout':\r
75       self.stdo = new_stream\r
76     else:\r
77       self.stde = new_stream\r
78     return result\r