Merge tag 'dm-pull-9jul19-take2' of https://gitlab.denx.de/u-boot/custodians/u-boot-dm
[oweals/u-boot.git] / tools / patman / cros_subprocess.py
1 # Copyright (c) 2012 The Chromium OS Authors.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4 #
5 # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6 # Licensed to PSF under a Contributor Agreement.
7 # See http://www.python.org/2.4/license for licensing details.
8
9 """Subprocress execution
10
11 This module holds a subclass of subprocess.Popen with our own required
12 features, mainly that we get access to the subprocess output while it
13 is running rather than just at the end. This makes it easiler to show
14 progress information and filter output in real time.
15 """
16
17 import errno
18 import os
19 import pty
20 import select
21 import subprocess
22 import sys
23 import unittest
24
25
26 # Import these here so the caller does not need to import subprocess also.
27 PIPE = subprocess.PIPE
28 STDOUT = subprocess.STDOUT
29 PIPE_PTY = -3     # Pipe output through a pty
30 stay_alive = True
31
32
33 class Popen(subprocess.Popen):
34     """Like subprocess.Popen with ptys and incremental output
35
36     This class deals with running a child process and filtering its output on
37     both stdout and stderr while it is running. We do this so we can monitor
38     progress, and possibly relay the output to the user if requested.
39
40     The class is similar to subprocess.Popen, the equivalent is something like:
41
42         Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43
44     But this class has many fewer features, and two enhancement:
45
46     1. Rather than getting the output data only at the end, this class sends it
47          to a provided operation as it arrives.
48     2. We use pseudo terminals so that the child will hopefully flush its output
49          to us as soon as it is produced, rather than waiting for the end of a
50          line.
51
52     Use CommunicateFilter() to handle output from the subprocess.
53
54     """
55
56     def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
57                  shell=False, cwd=None, env=None, binary=False, **kwargs):
58         """Cut-down constructor
59
60         Args:
61             args: Program and arguments for subprocess to execute.
62             stdin: See subprocess.Popen()
63             stdout: See subprocess.Popen(), except that we support the sentinel
64                     value of cros_subprocess.PIPE_PTY.
65             stderr: See subprocess.Popen(), except that we support the sentinel
66                     value of cros_subprocess.PIPE_PTY.
67             shell: See subprocess.Popen()
68             cwd: Working directory to change to for subprocess, or None if none.
69             env: Environment to use for this subprocess, or None to inherit parent.
70             kwargs: No other arguments are supported at the moment.    Passing other
71                     arguments will cause a ValueError to be raised.
72         """
73         stdout_pty = None
74         stderr_pty = None
75         self.binary = binary
76
77         if stdout == PIPE_PTY:
78             stdout_pty = pty.openpty()
79             stdout = os.fdopen(stdout_pty[1])
80         if stderr == PIPE_PTY:
81             stderr_pty = pty.openpty()
82             stderr = os.fdopen(stderr_pty[1])
83
84         super(Popen, self).__init__(args, stdin=stdin,
85                 stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
86                 **kwargs)
87
88         # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
89         # We want to use the master half on our end from now on.    Setting this here
90         # does make some assumptions about the implementation of subprocess, but
91         # those assumptions are pretty minor.
92
93         # Note that if stderr is STDOUT, then self.stderr will be set to None by
94         # this constructor.
95         if stdout_pty is not None:
96             self.stdout = os.fdopen(stdout_pty[0])
97         if stderr_pty is not None:
98             self.stderr = os.fdopen(stderr_pty[0])
99
100         # Insist that unit tests exist for other arguments we don't support.
101         if kwargs:
102             raise ValueError("Unit tests do not test extra args - please add tests")
103
104     def ConvertData(self, data):
105         """Convert stdout/stderr data to the correct format for output
106
107         Args:
108             data: Data to convert, or None for ''
109
110         Returns:
111             Converted data, as bytes
112         """
113         if data is None:
114             return b''
115         return data
116
117     def CommunicateFilter(self, output):
118         """Interact with process: Read data from stdout and stderr.
119
120         This method runs until end-of-file is reached, then waits for the
121         subprocess to terminate.
122
123         The output function is sent all output from the subprocess and must be
124         defined like this:
125
126             def Output([self,] stream, data)
127             Args:
128                 stream: the stream the output was received on, which will be
129                         sys.stdout or sys.stderr.
130                 data: a string containing the data
131
132         Note: The data read is buffered in memory, so do not use this
133         method if the data size is large or unlimited.
134
135         Args:
136             output: Function to call with each fragment of output.
137
138         Returns:
139             A tuple (stdout, stderr, combined) which is the data received on
140             stdout, stderr and the combined data (interleaved stdout and stderr).
141
142             Note that the interleaved output will only be sensible if you have
143             set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
144             the timing of the output in the subprocess. If a subprocess flips
145             between stdout and stderr quickly in succession, by the time we come to
146             read the output from each we may see several lines in each, and will read
147             all the stdout lines, then all the stderr lines. So the interleaving
148             may not be correct. In this case you might want to pass
149             stderr=cros_subprocess.STDOUT to the constructor.
150
151             This feature is still useful for subprocesses where stderr is
152             rarely used and indicates an error.
153
154             Note also that if you set stderr to STDOUT, then stderr will be empty
155             and the combined output will just be the same as stdout.
156         """
157
158         read_set = []
159         write_set = []
160         stdout = None # Return
161         stderr = None # Return
162
163         if self.stdin:
164             # Flush stdio buffer.    This might block, if the user has
165             # been writing to .stdin in an uncontrolled fashion.
166             self.stdin.flush()
167             if input:
168                 write_set.append(self.stdin)
169             else:
170                 self.stdin.close()
171         if self.stdout:
172             read_set.append(self.stdout)
173             stdout = b''
174         if self.stderr and self.stderr != self.stdout:
175             read_set.append(self.stderr)
176             stderr = b''
177         combined = b''
178
179         input_offset = 0
180         while read_set or write_set:
181             try:
182                 rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
183             except select.error as e:
184                 if e.args[0] == errno.EINTR:
185                     continue
186                 raise
187
188             if not stay_alive:
189                     self.terminate()
190
191             if self.stdin in wlist:
192                 # When select has indicated that the file is writable,
193                 # we can write up to PIPE_BUF bytes without risk
194                 # blocking.    POSIX defines PIPE_BUF >= 512
195                 chunk = input[input_offset : input_offset + 512]
196                 bytes_written = os.write(self.stdin.fileno(), chunk)
197                 input_offset += bytes_written
198                 if input_offset >= len(input):
199                     self.stdin.close()
200                     write_set.remove(self.stdin)
201
202             if self.stdout in rlist:
203                 data = b''
204                 # We will get an error on read if the pty is closed
205                 try:
206                     data = os.read(self.stdout.fileno(), 1024)
207                 except OSError:
208                     pass
209                 if not len(data):
210                     self.stdout.close()
211                     read_set.remove(self.stdout)
212                 else:
213                     stdout += data
214                     combined += data
215                     if output:
216                         output(sys.stdout, data)
217             if self.stderr in rlist:
218                 data = b''
219                 # We will get an error on read if the pty is closed
220                 try:
221                     data = os.read(self.stderr.fileno(), 1024)
222                 except OSError:
223                     pass
224                 if not len(data):
225                     self.stderr.close()
226                     read_set.remove(self.stderr)
227                 else:
228                     stderr += data
229                     combined += data
230                     if output:
231                         output(sys.stderr, data)
232
233         # All data exchanged.    Translate lists into strings.
234         stdout = self.ConvertData(stdout)
235         stderr = self.ConvertData(stderr)
236         combined = self.ConvertData(combined)
237
238         # Translate newlines, if requested.    We cannot let the file
239         # object do the translation: It is based on stdio, which is
240         # impossible to combine with select (unless forcing no
241         # buffering).
242         if self.universal_newlines and hasattr(file, 'newlines'):
243             if stdout:
244                 stdout = self._translate_newlines(stdout)
245             if stderr:
246                 stderr = self._translate_newlines(stderr)
247
248         self.wait()
249         return (stdout, stderr, combined)
250
251
252 # Just being a unittest.TestCase gives us 14 public methods.    Unless we
253 # disable this, we can only have 6 tests in a TestCase.    That's not enough.
254 #
255 # pylint: disable=R0904
256
257 class TestSubprocess(unittest.TestCase):
258     """Our simple unit test for this module"""
259
260     class MyOperation:
261         """Provides a operation that we can pass to Popen"""
262         def __init__(self, input_to_send=None):
263             """Constructor to set up the operation and possible input.
264
265             Args:
266                 input_to_send: a text string to send when we first get input. We will
267                     add \r\n to the string.
268             """
269             self.stdout_data = ''
270             self.stderr_data = ''
271             self.combined_data = ''
272             self.stdin_pipe = None
273             self._input_to_send = input_to_send
274             if input_to_send:
275                 pipe = os.pipe()
276                 self.stdin_read_pipe = pipe[0]
277                 self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
278
279         def Output(self, stream, data):
280             """Output handler for Popen. Stores the data for later comparison"""
281             if stream == sys.stdout:
282                 self.stdout_data += data
283             if stream == sys.stderr:
284                 self.stderr_data += data
285             self.combined_data += data
286
287             # Output the input string if we have one.
288             if self._input_to_send:
289                 self._stdin_write_pipe.write(self._input_to_send + '\r\n')
290                 self._stdin_write_pipe.flush()
291
292     def _BasicCheck(self, plist, oper):
293         """Basic checks that the output looks sane."""
294         self.assertEqual(plist[0], oper.stdout_data)
295         self.assertEqual(plist[1], oper.stderr_data)
296         self.assertEqual(plist[2], oper.combined_data)
297
298         # The total length of stdout and stderr should equal the combined length
299         self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
300
301     def test_simple(self):
302         """Simple redirection: Get process list"""
303         oper = TestSubprocess.MyOperation()
304         plist = Popen(['ps']).CommunicateFilter(oper.Output)
305         self._BasicCheck(plist, oper)
306
307     def test_stderr(self):
308         """Check stdout and stderr"""
309         oper = TestSubprocess.MyOperation()
310         cmd = 'echo fred >/dev/stderr && false || echo bad'
311         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
312         self._BasicCheck(plist, oper)
313         self.assertEqual(plist [0], 'bad\r\n')
314         self.assertEqual(plist [1], 'fred\r\n')
315
316     def test_shell(self):
317         """Check with and without shell works"""
318         oper = TestSubprocess.MyOperation()
319         cmd = 'echo test >/dev/stderr'
320         self.assertRaises(OSError, Popen, [cmd], shell=False)
321         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
322         self._BasicCheck(plist, oper)
323         self.assertEqual(len(plist [0]), 0)
324         self.assertEqual(plist [1], 'test\r\n')
325
326     def test_list_args(self):
327         """Check with and without shell works using list arguments"""
328         oper = TestSubprocess.MyOperation()
329         cmd = ['echo', 'test', '>/dev/stderr']
330         plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
331         self._BasicCheck(plist, oper)
332         self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
333         self.assertEqual(len(plist [1]), 0)
334
335         oper = TestSubprocess.MyOperation()
336
337         # this should be interpreted as 'echo' with the other args dropped
338         cmd = ['echo', 'test', '>/dev/stderr']
339         plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
340         self._BasicCheck(plist, oper)
341         self.assertEqual(plist [0], '\r\n')
342
343     def test_cwd(self):
344         """Check we can change directory"""
345         for shell in (False, True):
346             oper = TestSubprocess.MyOperation()
347             plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
348             self._BasicCheck(plist, oper)
349             self.assertEqual(plist [0], '/tmp\r\n')
350
351     def test_env(self):
352         """Check we can change environment"""
353         for add in (False, True):
354             oper = TestSubprocess.MyOperation()
355             env = os.environ
356             if add:
357                 env ['FRED'] = 'fred'
358             cmd = 'echo $FRED'
359             plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
360             self._BasicCheck(plist, oper)
361             self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
362
363     def test_extra_args(self):
364         """Check we can't add extra arguments"""
365         self.assertRaises(ValueError, Popen, 'true', close_fds=False)
366
367     def test_basic_input(self):
368         """Check that incremental input works
369
370         We set up a subprocess which will prompt for name. When we see this prompt
371         we send the name as input to the process. It should then print the name
372         properly to stdout.
373         """
374         oper = TestSubprocess.MyOperation('Flash')
375         prompt = 'What is your name?: '
376         cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
377         plist = Popen([cmd], stdin=oper.stdin_read_pipe,
378                 shell=True).CommunicateFilter(oper.Output)
379         self._BasicCheck(plist, oper)
380         self.assertEqual(len(plist [1]), 0)
381         self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
382
383     def test_isatty(self):
384         """Check that ptys appear as terminals to the subprocess"""
385         oper = TestSubprocess.MyOperation()
386         cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
387                 'else echo "not %d" >&%d; fi;')
388         both_cmds = ''
389         for fd in (1, 2):
390             both_cmds += cmd % (fd, fd, fd, fd, fd)
391         plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
392         self._BasicCheck(plist, oper)
393         self.assertEqual(plist [0], 'terminal 1\r\n')
394         self.assertEqual(plist [1], 'terminal 2\r\n')
395
396         # Now try with PIPE and make sure it is not a terminal
397         oper = TestSubprocess.MyOperation()
398         plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
399                 shell=True).CommunicateFilter(oper.Output)
400         self._BasicCheck(plist, oper)
401         self.assertEqual(plist [0], 'not 1\n')
402         self.assertEqual(plist [1], 'not 2\n')
403
404 if __name__ == '__main__':
405     unittest.main()