rename symbol: CONFIG_KIRKWOOD -> CONFIG_ARCH_KIRKWOOD
[oweals/u-boot.git] / tools / patman / cros_subprocess.py
1 # Copyright (c) 2012 The Chromium OS Authors.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4 #
5 # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6 # Licensed to PSF under a Contributor Agreement.
7 # See http://www.python.org/2.4/license for licensing details.
8
9 """Subprocess execution
10
11 This module holds a subclass of subprocess.Popen with our own required
12 features, mainly that we get access to the subprocess output while it
13 is running rather than just at the end. This makes it easier to show
14 progress information and filter output in real time.
15 """
16
17 import errno
18 import os
19 import pty
20 import select
21 import subprocess
22 import sys
23 import unittest
24
25
26 # Import these here so the caller does not need to import subprocess also.
27 PIPE = subprocess.PIPE
28 STDOUT = subprocess.STDOUT
29 PIPE_PTY = -3     # Pipe output through a pty
30 stay_alive = True
31
32
33 class Popen(subprocess.Popen):
34     """Like subprocess.Popen with ptys and incremental output
35
36     This class deals with running a child process and filtering its output on
37     both stdout and stderr while it is running. We do this so we can monitor
38     progress, and possibly relay the output to the user if requested.
39
40     The class is similar to subprocess.Popen, the equivalent is something like:
41
42         Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43
44     But this class has many fewer features, and two enhancement:
45
46     1. Rather than getting the output data only at the end, this class sends it
47          to a provided operation as it arrives.
48     2. We use pseudo terminals so that the child will hopefully flush its output
49          to us as soon as it is produced, rather than waiting for the end of a
50          line.
51
52     Use CommunicateFilter() to handle output from the subprocess.
53
54     """
55
56     def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
57                  shell=False, cwd=None, env=None, **kwargs):
58         """Cut-down constructor
59
60         Args:
61             args: Program and arguments for subprocess to execute.
62             stdin: See subprocess.Popen()
63             stdout: See subprocess.Popen(), except that we support the sentinel
64                     value of cros_subprocess.PIPE_PTY.
65             stderr: See subprocess.Popen(), except that we support the sentinel
66                     value of cros_subprocess.PIPE_PTY.
67             shell: See subprocess.Popen()
68             cwd: Working directory to change to for subprocess, or None if none.
69             env: Environment to use for this subprocess, or None to inherit parent.
70             kwargs: No other arguments are supported at the moment.    Passing other
71                     arguments will cause a ValueError to be raised.
72         """
73         stdout_pty = None
74         stderr_pty = None
75
76         if stdout == PIPE_PTY:
77             stdout_pty = pty.openpty()
78             stdout = os.fdopen(stdout_pty[1])
79         if stderr == PIPE_PTY:
80             stderr_pty = pty.openpty()
81             stderr = os.fdopen(stderr_pty[1])
82
83         super(Popen, self).__init__(args, stdin=stdin,
84                 stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
85                 **kwargs)
86
87         # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
88         # We want to use the master half on our end from now on.    Setting this here
89         # does make some assumptions about the implementation of subprocess, but
90         # those assumptions are pretty minor.
91
92         # Note that if stderr is STDOUT, then self.stderr will be set to None by
93         # this constructor.
94         if stdout_pty is not None:
95             self.stdout = os.fdopen(stdout_pty[0])
96         if stderr_pty is not None:
97             self.stderr = os.fdopen(stderr_pty[0])
98
99         # Insist that unit tests exist for other arguments we don't support.
100         if kwargs:
101             raise ValueError("Unit tests do not test extra args - please add tests")
102
103     def ConvertData(self, data):
104         """Convert stdout/stderr data to the correct format for output
105
106         Args:
107             data: Data to convert, or None for ''
108
109         Returns:
110             Converted data, as bytes
111         """
112         if data is None:
113             return b''
114         return data
115
116     def CommunicateFilter(self, output):
117         """Interact with process: Read data from stdout and stderr.
118
119         This method runs until end-of-file is reached, then waits for the
120         subprocess to terminate.
121
122         The output function is sent all output from the subprocess and must be
123         defined like this:
124
125             def Output([self,] stream, data)
126             Args:
127                 stream: the stream the output was received on, which will be
128                         sys.stdout or sys.stderr.
129                 data: a string containing the data
130
131         Note: The data read is buffered in memory, so do not use this
132         method if the data size is large or unlimited.
133
134         Args:
135             output: Function to call with each fragment of output.
136
137         Returns:
138             A tuple (stdout, stderr, combined) which is the data received on
139             stdout, stderr and the combined data (interleaved stdout and stderr).
140
141             Note that the interleaved output will only be sensible if you have
142             set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
143             the timing of the output in the subprocess. If a subprocess flips
144             between stdout and stderr quickly in succession, by the time we come to
145             read the output from each we may see several lines in each, and will read
146             all the stdout lines, then all the stderr lines. So the interleaving
147             may not be correct. In this case you might want to pass
148             stderr=cros_subprocess.STDOUT to the constructor.
149
150             This feature is still useful for subprocesses where stderr is
151             rarely used and indicates an error.
152
153             Note also that if you set stderr to STDOUT, then stderr will be empty
154             and the combined output will just be the same as stdout.
155         """
156
157         read_set = []
158         write_set = []
159         stdout = None # Return
160         stderr = None # Return
161
162         if self.stdin:
163             # Flush stdio buffer.    This might block, if the user has
164             # been writing to .stdin in an uncontrolled fashion.
165             self.stdin.flush()
166             if input:
167                 write_set.append(self.stdin)
168             else:
169                 self.stdin.close()
170         if self.stdout:
171             read_set.append(self.stdout)
172             stdout = b''
173         if self.stderr and self.stderr != self.stdout:
174             read_set.append(self.stderr)
175             stderr = b''
176         combined = b''
177
178         input_offset = 0
179         while read_set or write_set:
180             try:
181                 rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
182             except select.error as e:
183                 if e.args[0] == errno.EINTR:
184                     continue
185                 raise
186
187             if not stay_alive:
188                     self.terminate()
189
190             if self.stdin in wlist:
191                 # When select has indicated that the file is writable,
192                 # we can write up to PIPE_BUF bytes without risk
193                 # blocking.    POSIX defines PIPE_BUF >= 512
194                 chunk = input[input_offset : input_offset + 512]
195                 bytes_written = os.write(self.stdin.fileno(), chunk)
196                 input_offset += bytes_written
197                 if input_offset >= len(input):
198                     self.stdin.close()
199                     write_set.remove(self.stdin)
200
201             if self.stdout in rlist:
202                 data = b''
203                 # We will get an error on read if the pty is closed
204                 try:
205                     data = os.read(self.stdout.fileno(), 1024)
206                 except OSError:
207                     pass
208                 if not len(data):
209                     self.stdout.close()
210                     read_set.remove(self.stdout)
211                 else:
212                     stdout += data
213                     combined += data
214                     if output:
215                         output(sys.stdout, data)
216             if self.stderr in rlist:
217                 data = b''
218                 # We will get an error on read if the pty is closed
219                 try:
220                     data = os.read(self.stderr.fileno(), 1024)
221                 except OSError:
222                     pass
223                 if not len(data):
224                     self.stderr.close()
225                     read_set.remove(self.stderr)
226                 else:
227                     stderr += data
228                     combined += data
229                     if output:
230                         output(sys.stderr, data)
231
232         # All data exchanged.    Translate lists into strings.
233         stdout = self.ConvertData(stdout)
234         stderr = self.ConvertData(stderr)
235         combined = self.ConvertData(combined)
236
237         # Translate newlines, if requested.    We cannot let the file
238         # object do the translation: It is based on stdio, which is
239         # impossible to combine with select (unless forcing no
240         # buffering).
241         if self.universal_newlines and hasattr(file, 'newlines'):
242             if stdout:
243                 stdout = self._translate_newlines(stdout)
244             if stderr:
245                 stderr = self._translate_newlines(stderr)
246
247         self.wait()
248         return (stdout, stderr, combined)
249
250
251 # Just being a unittest.TestCase gives us 14 public methods.    Unless we
252 # disable this, we can only have 6 tests in a TestCase.    That's not enough.
253 #
254 # pylint: disable=R0904
255
256 class TestSubprocess(unittest.TestCase):
257     """Our simple unit test for this module"""
258
259     class MyOperation:
260         """Provides a operation that we can pass to Popen"""
261         def __init__(self, input_to_send=None):
262             """Constructor to set up the operation and possible input.
263
264             Args:
265                 input_to_send: a text string to send when we first get input. We will
266                     add \r\n to the string.
267             """
268             self.stdout_data = ''
269             self.stderr_data = ''
270             self.combined_data = ''
271             self.stdin_pipe = None
272             self._input_to_send = input_to_send
273             if input_to_send:
274                 pipe = os.pipe()
275                 self.stdin_read_pipe = pipe[0]
276                 self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
277
278         def Output(self, stream, data):
279             """Output handler for Popen. Stores the data for later comparison"""
280             if stream == sys.stdout:
281                 self.stdout_data += data
282             if stream == sys.stderr:
283                 self.stderr_data += data
284             self.combined_data += data
285
286             # Output the input string if we have one.
287             if self._input_to_send:
288                 self._stdin_write_pipe.write(self._input_to_send + '\r\n')
289                 self._stdin_write_pipe.flush()
290
291     def _BasicCheck(self, plist, oper):
292         """Basic checks that the output looks sane."""
293         self.assertEqual(plist[0], oper.stdout_data)
294         self.assertEqual(plist[1], oper.stderr_data)
295         self.assertEqual(plist[2], oper.combined_data)
296
297         # The total length of stdout and stderr should equal the combined length
298         self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
299
300     def test_simple(self):
301         """Simple redirection: Get process list"""
302         oper = TestSubprocess.MyOperation()
303         plist = Popen(['ps']).CommunicateFilter(oper.Output)
304         self._BasicCheck(plist, oper)
305
306     def test_stderr(self):
307         """Check stdout and stderr"""
308         oper = TestSubprocess.MyOperation()
309         cmd = 'echo fred >/dev/stderr && false || echo bad'
310         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
311         self._BasicCheck(plist, oper)
312         self.assertEqual(plist [0], 'bad\r\n')
313         self.assertEqual(plist [1], 'fred\r\n')
314
315     def test_shell(self):
316         """Check with and without shell works"""
317         oper = TestSubprocess.MyOperation()
318         cmd = 'echo test >/dev/stderr'
319         self.assertRaises(OSError, Popen, [cmd], shell=False)
320         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
321         self._BasicCheck(plist, oper)
322         self.assertEqual(len(plist [0]), 0)
323         self.assertEqual(plist [1], 'test\r\n')
324
325     def test_list_args(self):
326         """Check with and without shell works using list arguments"""
327         oper = TestSubprocess.MyOperation()
328         cmd = ['echo', 'test', '>/dev/stderr']
329         plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
330         self._BasicCheck(plist, oper)
331         self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
332         self.assertEqual(len(plist [1]), 0)
333
334         oper = TestSubprocess.MyOperation()
335
336         # this should be interpreted as 'echo' with the other args dropped
337         cmd = ['echo', 'test', '>/dev/stderr']
338         plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
339         self._BasicCheck(plist, oper)
340         self.assertEqual(plist [0], '\r\n')
341
342     def test_cwd(self):
343         """Check we can change directory"""
344         for shell in (False, True):
345             oper = TestSubprocess.MyOperation()
346             plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
347             self._BasicCheck(plist, oper)
348             self.assertEqual(plist [0], '/tmp\r\n')
349
350     def test_env(self):
351         """Check we can change environment"""
352         for add in (False, True):
353             oper = TestSubprocess.MyOperation()
354             env = os.environ
355             if add:
356                 env ['FRED'] = 'fred'
357             cmd = 'echo $FRED'
358             plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
359             self._BasicCheck(plist, oper)
360             self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
361
362     def test_extra_args(self):
363         """Check we can't add extra arguments"""
364         self.assertRaises(ValueError, Popen, 'true', close_fds=False)
365
366     def test_basic_input(self):
367         """Check that incremental input works
368
369         We set up a subprocess which will prompt for name. When we see this prompt
370         we send the name as input to the process. It should then print the name
371         properly to stdout.
372         """
373         oper = TestSubprocess.MyOperation('Flash')
374         prompt = 'What is your name?: '
375         cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
376         plist = Popen([cmd], stdin=oper.stdin_read_pipe,
377                 shell=True).CommunicateFilter(oper.Output)
378         self._BasicCheck(plist, oper)
379         self.assertEqual(len(plist [1]), 0)
380         self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
381
382     def test_isatty(self):
383         """Check that ptys appear as terminals to the subprocess"""
384         oper = TestSubprocess.MyOperation()
385         cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
386                 'else echo "not %d" >&%d; fi;')
387         both_cmds = ''
388         for fd in (1, 2):
389             both_cmds += cmd % (fd, fd, fd, fd, fd)
390         plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
391         self._BasicCheck(plist, oper)
392         self.assertEqual(plist [0], 'terminal 1\r\n')
393         self.assertEqual(plist [1], 'terminal 2\r\n')
394
395         # Now try with PIPE and make sure it is not a terminal
396         oper = TestSubprocess.MyOperation()
397         plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
398                 shell=True).CommunicateFilter(oper.Output)
399         self._BasicCheck(plist, oper)
400         self.assertEqual(plist [0], 'not 1\n')
401         self.assertEqual(plist [1], 'not 2\n')
402
403 if __name__ == '__main__':
404     unittest.main()