diff options
author | Tom Rini <trini@konsulko.com> | 2016-01-29 18:47:38 (GMT) |
---|---|---|
committer | Tom Rini <trini@konsulko.com> | 2016-01-29 18:47:38 (GMT) |
commit | 8a36287a019f5d7532a8a1a7da6aa96e490dbb8a (patch) | |
tree | cae7e68596b7405120720f7a81c7ffb04d6ffa6e /test/py | |
parent | 82d72a1b9967cff4908f22c57536c3660f794401 (diff) | |
parent | 26db3a617b38cc1bed1ce100381d2c4ccbb55e42 (diff) | |
download | u-boot-fsl-qoriq-8a36287a019f5d7532a8a1a7da6aa96e490dbb8a.tar.xz |
Merge git://git.denx.de/u-boot-dm
Diffstat (limited to 'test/py')
-rw-r--r-- | test/py/conftest.py | 118 | ||||
-rw-r--r-- | test/py/multiplexed_log.css | 8 | ||||
-rw-r--r-- | test/py/multiplexed_log.py | 228 | ||||
-rwxr-xr-x | test/py/test.py | 8 | ||||
-rw-r--r-- | test/py/tests/test_000_version.py | 2 | ||||
-rw-r--r-- | test/py/tests/test_dfu.py | 279 | ||||
-rw-r--r-- | test/py/tests/test_env.py | 66 | ||||
-rw-r--r-- | test/py/tests/test_help.py | 2 | ||||
-rw-r--r-- | test/py/tests/test_hush_if_test.py | 10 | ||||
-rw-r--r-- | test/py/tests/test_md.py | 13 | ||||
-rw-r--r-- | test/py/tests/test_net.py | 155 | ||||
-rw-r--r-- | test/py/tests/test_sandbox_exit.py | 6 | ||||
-rw-r--r-- | test/py/tests/test_shell_basics.py | 12 | ||||
-rw-r--r-- | test/py/tests/test_sleep.py | 8 | ||||
-rw-r--r-- | test/py/tests/test_ums.py | 243 | ||||
-rw-r--r-- | test/py/tests/test_unknown_cmd.py | 4 | ||||
-rw-r--r-- | test/py/u_boot_console_base.py | 226 | ||||
-rw-r--r-- | test/py/u_boot_console_exec_attach.py | 12 | ||||
-rw-r--r-- | test/py/u_boot_console_sandbox.py | 28 | ||||
-rw-r--r-- | test/py/u_boot_spawn.py | 47 | ||||
-rw-r--r-- | test/py/u_boot_utils.py | 209 |
21 files changed, 1286 insertions, 398 deletions
diff --git a/test/py/conftest.py b/test/py/conftest.py index e1674df..3e162ca 100644 --- a/test/py/conftest.py +++ b/test/py/conftest.py @@ -29,7 +29,7 @@ log = None console = None def mkdir_p(path): - '''Create a directory path. + """Create a directory path. This includes creating any intermediate/parent directories. Any errors caused due to already extant directories are ignored. @@ -39,7 +39,7 @@ def mkdir_p(path): Returns: Nothing. - ''' + """ try: os.makedirs(path) @@ -50,14 +50,14 @@ def mkdir_p(path): raise def pytest_addoption(parser): - '''pytest hook: Add custom command-line options to the cmdline parser. + """pytest hook: Add custom command-line options to the cmdline parser. Args: parser: The pytest command-line parser. Returns: Nothing. - ''' + """ parser.addoption('--build-dir', default=None, help='U-Boot build directory (O=)') @@ -73,14 +73,14 @@ def pytest_addoption(parser): help='Compile U-Boot before running tests') def pytest_configure(config): - '''pytest hook: Perform custom initialization at startup time. + """pytest hook: Perform custom initialization at startup time. Args: config: The pytest configuration. Returns: Nothing. - ''' + """ global log global console @@ -190,7 +190,7 @@ def pytest_configure(config): console = u_boot_console_exec_attach.ConsoleExecAttach(log, ubconfig) def pytest_generate_tests(metafunc): - '''pytest hook: parameterize test functions based on custom rules. + """pytest hook: parameterize test functions based on custom rules. If a test function takes parameter(s) (fixture names) of the form brd__xxx or env__xxx, the brd and env configuration dictionaries are consulted to @@ -202,7 +202,7 @@ def pytest_generate_tests(metafunc): Returns: Nothing. - ''' + """ subconfigs = { 'brd': console.config.brd, @@ -225,28 +225,37 @@ def pytest_generate_tests(metafunc): # ... otherwise, see if there's a key that contains a list of # values to use instead. vals = subconfig.get(fn + 's', []) - metafunc.parametrize(fn, vals) - -@pytest.fixture(scope='session') + def fixture_id(index, val): + try: + return val["fixture_id"] + except: + return fn + str(index) + ids = [fixture_id(index, val) for (index, val) in enumerate(vals)] + metafunc.parametrize(fn, vals, ids=ids) + +@pytest.fixture(scope='function') def u_boot_console(request): - '''Generate the value of a test's u_boot_console fixture. + """Generate the value of a test's u_boot_console fixture. Args: request: The pytest request. Returns: The fixture value. - ''' + """ + console.ensure_spawned() return console tests_not_run = set() tests_failed = set() +tests_xpassed = set() +tests_xfailed = set() tests_skipped = set() tests_passed = set() def pytest_itemcollected(item): - '''pytest hook: Called once for each test found during collection. + """pytest hook: Called once for each test found during collection. This enables our custom result analysis code to see the list of all tests that should eventually be run. @@ -256,12 +265,12 @@ def pytest_itemcollected(item): Returns: Nothing. - ''' + """ tests_not_run.add(item.name) def cleanup(): - '''Clean up all global state. + """Clean up all global state. Executed (via atexit) once the entire test process is complete. This includes logging the status of all tests, and the identity of any failed @@ -272,7 +281,7 @@ def cleanup(): Returns: Nothing. - ''' + """ if console: console.close() @@ -282,6 +291,14 @@ def cleanup(): log.status_skipped('%d skipped' % len(tests_skipped)) for test in tests_skipped: log.status_skipped('... ' + test) + if tests_xpassed: + log.status_xpass('%d xpass' % len(tests_xpassed)) + for test in tests_xpassed: + log.status_xpass('... ' + test) + if tests_xfailed: + log.status_xfail('%d xfail' % len(tests_xfailed)) + for test in tests_xfailed: + log.status_xfail('... ' + test) if tests_failed: log.status_fail('%d failed' % len(tests_failed)) for test in tests_failed: @@ -294,7 +311,7 @@ def cleanup(): atexit.register(cleanup) def setup_boardspec(item): - '''Process any 'boardspec' marker for a test. + """Process any 'boardspec' marker for a test. Such a marker lists the set of board types that a test does/doesn't support. If tests are being executed on an unsupported board, the test is @@ -305,7 +322,7 @@ def setup_boardspec(item): Returns: Nothing. - ''' + """ mark = item.get_marker('boardspec') if not mark: @@ -322,7 +339,7 @@ def setup_boardspec(item): pytest.skip('board not supported') def setup_buildconfigspec(item): - '''Process any 'buildconfigspec' marker for a test. + """Process any 'buildconfigspec' marker for a test. Such a marker lists some U-Boot configuration feature that the test requires. If tests are being executed on an U-Boot build that doesn't @@ -333,7 +350,7 @@ def setup_buildconfigspec(item): Returns: Nothing. - ''' + """ mark = item.get_marker('buildconfigspec') if not mark: @@ -343,7 +360,7 @@ def setup_buildconfigspec(item): pytest.skip('.config feature not enabled') def pytest_runtest_setup(item): - '''pytest hook: Configure (set up) a test item. + """pytest hook: Configure (set up) a test item. Called once for each test to perform any custom configuration. This hook is used to skip the test if certain conditions apply. @@ -353,14 +370,14 @@ def pytest_runtest_setup(item): Returns: Nothing. - ''' + """ log.start_section(item.name) setup_boardspec(item) setup_buildconfigspec(item) def pytest_runtest_protocol(item, nextitem): - '''pytest hook: Called to execute a test. + """pytest hook: Called to execute a test. This hook wraps the standard pytest runtestprotocol() function in order to acquire visibility into, and record, each test function's result. @@ -371,36 +388,45 @@ def pytest_runtest_protocol(item, nextitem): Returns: A list of pytest reports (test result data). - ''' + """ reports = runtestprotocol(item, nextitem=nextitem) - failed = None - skipped = None + + failure_cleanup = False + test_list = tests_passed + msg = 'OK' + msg_log = log.status_pass for report in reports: if report.outcome == 'failed': - failed = report + if hasattr(report, 'wasxfail'): + test_list = tests_xpassed + msg = 'XPASSED' + msg_log = log.status_xpass + else: + failure_cleanup = True + test_list = tests_failed + msg = 'FAILED:\n' + str(report.longrepr) + msg_log = log.status_fail break if report.outcome == 'skipped': - if not skipped: - skipped = report - - if failed: - tests_failed.add(item.name) - elif skipped: - tests_skipped.add(item.name) - else: - tests_passed.add(item.name) + if hasattr(report, 'wasxfail'): + failure_cleanup = True + test_list = tests_xfailed + msg = 'XFAILED:\n' + str(report.longrepr) + msg_log = log.status_xfail + break + test_list = tests_skipped + msg = 'SKIPPED:\n' + str(report.longrepr) + msg_log = log.status_skipped + + if failure_cleanup: + console.drain_console() + + test_list.add(item.name) tests_not_run.remove(item.name) try: - if failed: - msg = 'FAILED:\n' + str(failed.longrepr) - log.status_fail(msg) - elif skipped: - msg = 'SKIPPED:\n' + str(skipped.longrepr) - log.status_skipped(msg) - else: - log.status_pass('OK') + msg_log(msg) except: # If something went wrong with logging, it's better to let the test # process continue, which may report other exceptions that triggered @@ -416,7 +442,7 @@ def pytest_runtest_protocol(item, nextitem): log.end_section(item.name) - if failed: + if failure_cleanup: console.cleanup_spawn() return reports diff --git a/test/py/multiplexed_log.css b/test/py/multiplexed_log.css index 50f7b90..f6240d5 100644 --- a/test/py/multiplexed_log.css +++ b/test/py/multiplexed_log.css @@ -83,6 +83,14 @@ pre { color: #ffff00 } +.status-xfail { + color: #ff7f00 +} + +.status-xpass { + color: #ff7f00 +} + .status-fail { color: #ff0000 } diff --git a/test/py/multiplexed_log.py b/test/py/multiplexed_log.py index 48f2b51..69a577e 100644 --- a/test/py/multiplexed_log.py +++ b/test/py/multiplexed_log.py @@ -14,12 +14,12 @@ import subprocess mod_dir = os.path.dirname(os.path.abspath(__file__)) class LogfileStream(object): - '''A file-like object used to write a single logical stream of data into + """A file-like object used to write a single logical stream of data into a multiplexed log file. Objects of this type should be created by factory - functions in the Logfile class rather than directly.''' + functions in the Logfile class rather than directly.""" def __init__(self, logfile, name, chained_file): - '''Initialize a new object. + """Initialize a new object. Args: logfile: The Logfile object to log to. @@ -29,26 +29,26 @@ class LogfileStream(object): Returns: Nothing. - ''' + """ self.logfile = logfile self.name = name self.chained_file = chained_file def close(self): - '''Dummy function so that this class is "file-like". + """Dummy function so that this class is "file-like". Args: None. Returns: Nothing. - ''' + """ pass def write(self, data, implicit=False): - '''Write data to the log stream. + """Write data to the log stream. Args: data: The data to write tot he file. @@ -60,33 +60,33 @@ class LogfileStream(object): Returns: Nothing. - ''' + """ self.logfile.write(self, data, implicit) if self.chained_file: self.chained_file.write(data) def flush(self): - '''Flush the log stream, to ensure correct log interleaving. + """Flush the log stream, to ensure correct log interleaving. Args: None. Returns: Nothing. - ''' + """ self.logfile.flush() if self.chained_file: self.chained_file.flush() class RunAndLog(object): - '''A utility object used to execute sub-processes and log their output to + """A utility object used to execute sub-processes and log their output to a multiplexed log file. Objects of this type should be created by factory - functions in the Logfile class rather than directly.''' + functions in the Logfile class rather than directly.""" def __init__(self, logfile, name, chained_file): - '''Initialize a new object. + """Initialize a new object. Args: logfile: The Logfile object to log to. @@ -96,29 +96,33 @@ class RunAndLog(object): Returns: Nothing. - ''' + """ self.logfile = logfile self.name = name self.chained_file = chained_file def close(self): - '''Clean up any resources managed by this object.''' + """Clean up any resources managed by this object.""" pass - def run(self, cmd, cwd=None): - '''Run a command as a sub-process, and log the results. + def run(self, cmd, cwd=None, ignore_errors=False): + """Run a command as a sub-process, and log the results. Args: cmd: The command to execute. cwd: The directory to run the command in. Can be None to use the current directory. + ignore_errors: Indicate whether to ignore errors. If True, the + function will simply return if the command cannot be executed + or exits with an error code, otherwise an exception will be + raised if such problems occur. Returns: Nothing. - ''' + """ - msg = "+" + " ".join(cmd) + "\n" + msg = '+' + ' '.join(cmd) + '\n' if self.chained_file: self.chained_file.write(msg) self.logfile.write(self, msg) @@ -148,7 +152,7 @@ class RunAndLog(object): exception = e if output and not output.endswith('\n'): output += '\n' - if exit_status and not exception: + if exit_status and not exception and not ignore_errors: exception = Exception('Exit code: ' + str(exit_status)) if exception: output += str(exception) + '\n' @@ -159,13 +163,13 @@ class RunAndLog(object): raise exception class SectionCtxMgr(object): - '''A context manager for Python's "with" statement, which allows a certain + """A context manager for Python's "with" statement, which allows a certain portion of test code to be logged to a separate section of the log file. Objects of this type should be created by factory functions in the Logfile - class rather than directly.''' + class rather than directly.""" def __init__(self, log, marker): - '''Initialize a new object. + """Initialize a new object. Args: log: The Logfile object to log to. @@ -173,7 +177,7 @@ class SectionCtxMgr(object): Returns: Nothing. - ''' + """ self.log = log self.marker = marker @@ -185,35 +189,35 @@ class SectionCtxMgr(object): self.log.end_section(self.marker) class Logfile(object): - '''Generates an HTML-formatted log file containing multiple streams of - data, each represented in a well-delineated/-structured fashion.''' + """Generates an HTML-formatted log file containing multiple streams of + data, each represented in a well-delineated/-structured fashion.""" def __init__(self, fn): - '''Initialize a new object. + """Initialize a new object. Args: fn: The filename to write to. Returns: Nothing. - ''' + """ - self.f = open(fn, "wt") + self.f = open(fn, 'wt') self.last_stream = None self.blocks = [] self.cur_evt = 1 - shutil.copy(mod_dir + "/multiplexed_log.css", os.path.dirname(fn)) - self.f.write("""\ + shutil.copy(mod_dir + '/multiplexed_log.css', os.path.dirname(fn)) + self.f.write('''\ <html> <head> <link rel="stylesheet" type="text/css" href="multiplexed_log.css"> </head> <body> <tt> -""") +''') def close(self): - '''Close the log file. + """Close the log file. After calling this function, no more data may be written to the log. @@ -222,22 +226,22 @@ class Logfile(object): Returns: Nothing. - ''' + """ - self.f.write("""\ + self.f.write('''\ </tt> </body> </html> -""") +''') self.f.close() # The set of characters that should be represented as hexadecimal codes in # the log file. - _nonprint = ("%" + "".join(chr(c) for c in range(0, 32) if c not in (9, 10)) + - "".join(chr(c) for c in range(127, 256))) + _nonprint = ('%' + ''.join(chr(c) for c in range(0, 32) if c not in (9, 10)) + + ''.join(chr(c) for c in range(127, 256))) def _escape(self, data): - '''Render data format suitable for inclusion in an HTML document. + """Render data format suitable for inclusion in an HTML document. This includes HTML-escaping certain characters, and translating control characters to a hexadecimal representation. @@ -247,36 +251,36 @@ class Logfile(object): Returns: An escaped version of the data. - ''' + """ - data = data.replace(chr(13), "") - data = "".join((c in self._nonprint) and ("%%%02x" % ord(c)) or + data = data.replace(chr(13), '') + data = ''.join((c in self._nonprint) and ('%%%02x' % ord(c)) or c for c in data) data = cgi.escape(data) return data def _terminate_stream(self): - '''Write HTML to the log file to terminate the current stream's data. + """Write HTML to the log file to terminate the current stream's data. Args: None. Returns: Nothing. - ''' + """ self.cur_evt += 1 if not self.last_stream: return - self.f.write("</pre>\n") - self.f.write("<div class=\"stream-trailer\" id=\"" + - self.last_stream.name + "\">End stream: " + - self.last_stream.name + "</div>\n") - self.f.write("</div>\n") + self.f.write('</pre>\n') + self.f.write('<div class="stream-trailer" id="' + + self.last_stream.name + '">End stream: ' + + self.last_stream.name + '</div>\n') + self.f.write('</div>\n') self.last_stream = None def _note(self, note_type, msg): - '''Write a note or one-off message to the log file. + """Write a note or one-off message to the log file. Args: note_type: The type of note. This must be a value supported by the @@ -285,32 +289,32 @@ class Logfile(object): Returns: Nothing. - ''' + """ self._terminate_stream() - self.f.write("<div class=\"" + note_type + "\">\n<pre>") + self.f.write('<div class="' + note_type + '">\n<pre>') self.f.write(self._escape(msg)) - self.f.write("\n</pre></div>\n") + self.f.write('\n</pre></div>\n') def start_section(self, marker): - '''Begin a new nested section in the log file. + """Begin a new nested section in the log file. Args: marker: The name of the section that is starting. Returns: Nothing. - ''' + """ self._terminate_stream() self.blocks.append(marker) - blk_path = "/".join(self.blocks) - self.f.write("<div class=\"section\" id=\"" + blk_path + "\">\n") - self.f.write("<div class=\"section-header\" id=\"" + blk_path + - "\">Section: " + blk_path + "</div>\n") + blk_path = '/'.join(self.blocks) + self.f.write('<div class="section" id="' + blk_path + '">\n') + self.f.write('<div class="section-header" id="' + blk_path + + '">Section: ' + blk_path + '</div>\n') def end_section(self, marker): - '''Terminate the current nested section in the log file. + """Terminate the current nested section in the log file. This function validates proper nesting of start_section() and end_section() calls. If a mismatch is found, an exception is raised. @@ -320,20 +324,20 @@ class Logfile(object): Returns: Nothing. - ''' + """ if (not self.blocks) or (marker != self.blocks[-1]): - raise Exception("Block nesting mismatch: \"%s\" \"%s\"" % - (marker, "/".join(self.blocks))) + raise Exception('Block nesting mismatch: "%s" "%s"' % + (marker, '/'.join(self.blocks))) self._terminate_stream() - blk_path = "/".join(self.blocks) - self.f.write("<div class=\"section-trailer\" id=\"section-trailer-" + - blk_path + "\">End section: " + blk_path + "</div>\n") - self.f.write("</div>\n") + blk_path = '/'.join(self.blocks) + self.f.write('<div class="section-trailer" id="section-trailer-' + + blk_path + '">End section: ' + blk_path + '</div>\n') + self.f.write('</div>\n') self.blocks.pop() def section(self, marker): - '''Create a temporary section in the log file. + """Create a temporary section in the log file. This function creates a context manager for Python's "with" statement, which allows a certain portion of test code to be logged to a separate @@ -348,96 +352,120 @@ class Logfile(object): Returns: A context manager object. - ''' + """ return SectionCtxMgr(self, marker) def error(self, msg): - '''Write an error note to the log file. + """Write an error note to the log file. Args: msg: A message describing the error. Returns: Nothing. - ''' + """ self._note("error", msg) def warning(self, msg): - '''Write an warning note to the log file. + """Write an warning note to the log file. Args: msg: A message describing the warning. Returns: Nothing. - ''' + """ self._note("warning", msg) def info(self, msg): - '''Write an informational note to the log file. + """Write an informational note to the log file. Args: msg: An informational message. Returns: Nothing. - ''' + """ self._note("info", msg) def action(self, msg): - '''Write an action note to the log file. + """Write an action note to the log file. Args: msg: A message describing the action that is being logged. Returns: Nothing. - ''' + """ self._note("action", msg) def status_pass(self, msg): - '''Write a note to the log file describing test(s) which passed. + """Write a note to the log file describing test(s) which passed. Args: - msg: A message describing passed test(s). + msg: A message describing the passed test(s). Returns: Nothing. - ''' + """ self._note("status-pass", msg) def status_skipped(self, msg): - '''Write a note to the log file describing skipped test(s). + """Write a note to the log file describing skipped test(s). Args: - msg: A message describing passed test(s). + msg: A message describing the skipped test(s). Returns: Nothing. - ''' + """ self._note("status-skipped", msg) + def status_xfail(self, msg): + """Write a note to the log file describing xfailed test(s). + + Args: + msg: A message describing the xfailed test(s). + + Returns: + Nothing. + """ + + self._note("status-xfail", msg) + + def status_xpass(self, msg): + """Write a note to the log file describing xpassed test(s). + + Args: + msg: A message describing the xpassed test(s). + + Returns: + Nothing. + """ + + self._note("status-xpass", msg) + def status_fail(self, msg): - '''Write a note to the log file describing failed test(s). + """Write a note to the log file describing failed test(s). Args: - msg: A message describing passed test(s). + msg: A message describing the failed test(s). Returns: Nothing. - ''' + """ self._note("status-fail", msg) def get_stream(self, name, chained_file=None): - '''Create an object to log a single stream's data into the log file. + """Create an object to log a single stream's data into the log file. This creates a "file-like" object that can be written to in order to write a single stream's data to the log file. The implementation will @@ -452,12 +480,12 @@ class Logfile(object): Returns: A file-like object. - ''' + """ return LogfileStream(self, name, chained_file) def get_runner(self, name, chained_file=None): - '''Create an object that executes processes and logs their output. + """Create an object that executes processes and logs their output. Args: name: The name of this sub-process. @@ -466,12 +494,12 @@ class Logfile(object): Returns: A RunAndLog object. - ''' + """ return RunAndLog(self, name, chained_file) def write(self, stream, data, implicit=False): - '''Write stream data into the log file. + """Write stream data into the log file. This function should only be used by instances of LogfileStream or RunAndLog. @@ -487,29 +515,29 @@ class Logfile(object): Returns: Nothing. - ''' + """ if stream != self.last_stream: self._terminate_stream() - self.f.write("<div class=\"stream\" id=\"%s\">\n" % stream.name) - self.f.write("<div class=\"stream-header\" id=\"" + stream.name + - "\">Stream: " + stream.name + "</div>\n") - self.f.write("<pre>") + self.f.write('<div class="stream" id="%s">\n' % stream.name) + self.f.write('<div class="stream-header" id="' + stream.name + + '">Stream: ' + stream.name + '</div>\n') + self.f.write('<pre>') if implicit: - self.f.write("<span class=\"implicit\">") + self.f.write('<span class="implicit">') self.f.write(self._escape(data)) if implicit: - self.f.write("</span>") + self.f.write('</span>') self.last_stream = stream def flush(self): - '''Flush the log stream, to ensure correct log interleaving. + """Flush the log stream, to ensure correct log interleaving. Args: None. Returns: Nothing. - ''' + """ self.f.flush() diff --git a/test/py/test.py b/test/py/test.py index 9c23898..95671d4 100755 --- a/test/py/test.py +++ b/test/py/test.py @@ -16,17 +16,17 @@ import sys sys.argv.pop(0) # argv; py.test test_directory_name user-supplied-arguments -args = ["py.test", os.path.dirname(__file__) + "/tests"] +args = ['py.test', os.path.dirname(__file__) + '/tests'] args.extend(sys.argv) try: - os.execvp("py.test", args) + os.execvp('py.test', args) except: # Log full details of any exception for detailed analysis import traceback traceback.print_exc() # Hint to the user that they likely simply haven't installed the required # dependencies. - print >>sys.stderr, """ + print >>sys.stderr, ''' exec(py.test) failed; perhaps you are missing some dependencies? -See test/py/README.md for the list.""" +See test/py/README.md for the list.''' diff --git a/test/py/tests/test_000_version.py b/test/py/tests/test_000_version.py index d262f05..43a02e7 100644 --- a/test/py/tests/test_000_version.py +++ b/test/py/tests/test_000_version.py @@ -9,7 +9,7 @@ # command prompt. def test_version(u_boot_console): - '''Test that the "version" command prints the U-Boot version.''' + """Test that the "version" command prints the U-Boot version.""" # "version" prints the U-Boot sign-on message. This is usually considered # an error, so that any unexpected reboot causes an error. Here, this diff --git a/test/py/tests/test_dfu.py b/test/py/tests/test_dfu.py new file mode 100644 index 0000000..093e8d0 --- /dev/null +++ b/test/py/tests/test_dfu.py @@ -0,0 +1,279 @@ +# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. +# +# SPDX-License-Identifier: GPL-2.0 + +# Test U-Boot's "dfu" command. The test starts DFU in U-Boot, waits for USB +# device enumeration on the host, executes dfu-util multiple times to test +# various transfer sizes, many of which trigger USB driver edge cases, and +# finally aborts the "dfu" command in U-Boot. + +import os +import os.path +import pytest +import u_boot_utils + +""" +Note: This test relies on: + +a) boardenv_* to contain configuration values to define which USB ports are +available for testing. Without this, this test will be automatically skipped. +For example: + +env__usb_dev_ports = ( + { + "fixture_id": "micro_b", + "tgt_usb_ctlr": "0", + "host_usb_dev_node": "/dev/usbdev-p2371-2180", + # This parameter is optional /if/ you only have a single board + # attached to your host at a time. + "host_usb_port_path": "3-13", + }, +) + +env__dfu_configs = ( + # eMMC, partition 1 + { + "fixture_id": "emmc", + "alt_info": "/dfu_test.bin ext4 0 1;/dfu_dummy.bin ext4 0 1", + "cmd_params": "mmc 0", + # This value is optional. + # If present, it specified the set of transfer sizes tested. + # If missing, a default list of sizes will be used, which covers + # various useful corner cases. + # Manually specifying test sizes is useful if you wish to test 4 DFU + # configurations, but don't want to test every single transfer size + # on each, to avoid bloating the overall time taken by testing. + "test_sizes": (63, 64, 65), + }, +) + +b) udev rules to set permissions on devices nodes, so that sudo is not +required. For example: + +ACTION=="add", SUBSYSTEM=="block", SUBSYSTEMS=="usb", KERNELS=="3-13", MODE:="666" + +(You may wish to change the group ID instead of setting the permissions wide +open. All that matters is that the user ID running the test can access the +device.) +""" + +# The set of file sizes to test. These values trigger various edge-cases such +# as one less than, equal to, and one greater than typical USB max packet +# sizes, and similar boundary conditions. +test_sizes_default = ( + 64 - 1, + 64, + 64 + 1, + 128 - 1, + 128, + 128 + 1, + 960 - 1, + 960, + 960 + 1, + 4096 - 1, + 4096, + 4096 + 1, + 1024 * 1024 - 1, + 1024 * 1024, + 8 * 1024 * 1024, +) + +first_usb_dev_port = None + +@pytest.mark.buildconfigspec('cmd_dfu') +def test_dfu(u_boot_console, env__usb_dev_port, env__dfu_config): + """Test the "dfu" command; the host system must be able to enumerate a USB + device when "dfu" is running, various DFU transfers are tested, and the + USB device must disappear when "dfu" is aborted. + + Args: + u_boot_console: A U-Boot console connection. + env__usb_dev_port: The single USB device-mode port specification on + which to run the test. See the file-level comment above for + details of the format. + env__dfu_config: The single DFU (memory region) configuration on which + to run the test. See the file-level comment above for details + of the format. + + Returns: + Nothing. + """ + + def start_dfu(): + """Start U-Boot's dfu shell command. + + This also waits for the host-side USB enumeration process to complete. + + Args: + None. + + Returns: + Nothing. + """ + + fh = u_boot_utils.attempt_to_open_file( + env__usb_dev_port['host_usb_dev_node']) + if fh: + fh.close() + raise Exception('USB device present before dfu command invoked') + + u_boot_console.log.action( + 'Starting long-running U-Boot dfu shell command') + + cmd = 'setenv dfu_alt_info "%s"' % env__dfu_config['alt_info'] + u_boot_console.run_command(cmd) + + cmd = 'dfu 0 ' + env__dfu_config['cmd_params'] + u_boot_console.run_command(cmd, wait_for_prompt=False) + u_boot_console.log.action('Waiting for DFU USB device to appear') + fh = u_boot_utils.wait_until_open_succeeds( + env__usb_dev_port['host_usb_dev_node']) + fh.close() + + def stop_dfu(ignore_errors): + """Stop U-Boot's dfu shell command from executing. + + This also waits for the host-side USB de-enumeration process to + complete. + + Args: + ignore_errors: Ignore any errors. This is useful if an error has + already been detected, and the code is performing best-effort + cleanup. In this case, we do not want to mask the original + error by "honoring" any new errors. + + Returns: + Nothing. + """ + + try: + u_boot_console.log.action( + 'Stopping long-running U-Boot dfu shell command') + u_boot_console.ctrlc() + u_boot_console.log.action( + 'Waiting for DFU USB device to disappear') + u_boot_utils.wait_until_file_open_fails( + env__usb_dev_port['host_usb_dev_node'], ignore_errors) + except: + if not ignore_errors: + raise + + def run_dfu_util(alt_setting, fn, up_dn_load_arg): + """Invoke dfu-util on the host. + + Args: + alt_setting: The DFU "alternate setting" identifier to interact + with. + fn: The host-side file name to transfer. + up_dn_load_arg: '-U' or '-D' depending on whether a DFU upload or + download operation should be performed. + + Returns: + Nothing. + """ + + cmd = ['dfu-util', '-a', str(alt_setting), up_dn_load_arg, fn] + if 'host_usb_port_path' in env__usb_dev_port: + cmd += ['-p', env__usb_dev_port['host_usb_port_path']] + u_boot_utils.run_and_log(u_boot_console, cmd) + u_boot_console.wait_for('Ctrl+C to exit ...') + + def dfu_write(alt_setting, fn): + """Write a file to the target board using DFU. + + Args: + alt_setting: The DFU "alternate setting" identifier to interact + with. + fn: The host-side file name to transfer. + + Returns: + Nothing. + """ + + run_dfu_util(alt_setting, fn, '-D') + + def dfu_read(alt_setting, fn): + """Read a file from the target board using DFU. + + Args: + alt_setting: The DFU "alternate setting" identifier to interact + with. + fn: The host-side file name to transfer. + + Returns: + Nothing. + """ + + # dfu-util fails reads/uploads if the host file already exists + if os.path.exists(fn): + os.remove(fn) + run_dfu_util(alt_setting, fn, '-U') + + def dfu_write_read_check(size): + """Test DFU transfers of a specific size of data + + This function first writes data to the board then reads it back and + compares the written and read back data. Measures are taken to avoid + certain types of false positives. + + Args: + size: The data size to test. + + Returns: + Nothing. + """ + + test_f = u_boot_utils.PersistentRandomFile(u_boot_console, + 'dfu_%d.bin' % size, size) + readback_fn = u_boot_console.config.result_dir + '/dfu_readback.bin' + + u_boot_console.log.action('Writing test data to DFU primary ' + + 'altsetting') + dfu_write(0, test_f.abs_fn) + + u_boot_console.log.action('Writing dummy data to DFU secondary ' + + 'altsetting to clear DFU buffers') + dfu_write(1, dummy_f.abs_fn) + + u_boot_console.log.action('Reading DFU primary altsetting for ' + + 'comparison') + dfu_read(0, readback_fn) + + u_boot_console.log.action('Comparing written and read data') + written_hash = test_f.content_hash + read_back_hash = u_boot_utils.md5sum_file(readback_fn, size) + assert(written_hash == read_back_hash) + + # This test may be executed against multiple USB ports. The test takes a + # long time, so we don't want to do the whole thing each time. Instead, + # execute the full test on the first USB port, and perform a very limited + # test on other ports. In the limited case, we solely validate that the + # host PC can enumerate the U-Boot USB device. + global first_usb_dev_port + if not first_usb_dev_port: + first_usb_dev_port = env__usb_dev_port + if env__usb_dev_port == first_usb_dev_port: + sizes = env__dfu_config.get('test_sizes', test_sizes_default) + else: + sizes = [] + + dummy_f = u_boot_utils.PersistentRandomFile(u_boot_console, + 'dfu_dummy.bin', 1024) + + ignore_cleanup_errors = True + try: + start_dfu() + + u_boot_console.log.action( + 'Overwriting DFU primary altsetting with dummy data') + dfu_write(0, dummy_f.abs_fn) + + for size in sizes: + with u_boot_console.log.section('Data size %d' % size): + dfu_write_read_check(size) + # Make the status of each sub-test obvious. If the test didn't + # pass, an exception was thrown so this code isn't executed. + u_boot_console.log.status_pass('OK') + ignore_cleanup_errors = False + finally: + stop_dfu(ignore_cleanup_errors) diff --git a/test/py/tests/test_env.py b/test/py/tests/test_env.py index a3e8dd3..c41aa5a 100644 --- a/test/py/tests/test_env.py +++ b/test/py/tests/test_env.py @@ -10,34 +10,34 @@ import pytest # FIXME: This might be useful for other tests; # perhaps refactor it into ConsoleBase or some other state object? class StateTestEnv(object): - '''Container that represents the state of all U-Boot environment variables. + """Container that represents the state of all U-Boot environment variables. This enables quick determination of existant/non-existant variable names. - ''' + """ def __init__(self, u_boot_console): - '''Initialize a new StateTestEnv object. + """Initialize a new StateTestEnv object. Args: u_boot_console: A U-Boot console. Returns: Nothing. - ''' + """ self.u_boot_console = u_boot_console self.get_env() self.set_var = self.get_non_existent_var() def get_env(self): - '''Read all current environment variables from U-Boot. + """Read all current environment variables from U-Boot. Args: None. Returns: Nothing. - ''' + """ response = self.u_boot_console.run_command('printenv') self.env = {} @@ -48,27 +48,27 @@ class StateTestEnv(object): self.env[var] = value def get_existent_var(self): - '''Return the name of an environment variable that exists. + """Return the name of an environment variable that exists. Args: None. Returns: The name of an environment variable. - ''' + """ for var in self.env: return var def get_non_existent_var(self): - '''Return the name of an environment variable that does not exist. + """Return the name of an environment variable that does not exist. Args: None. Returns: The name of an environment variable. - ''' + """ n = 0 while True: @@ -77,63 +77,67 @@ class StateTestEnv(object): return var n += 1 -@pytest.fixture(scope='module') +ste = None +@pytest.fixture(scope='function') def state_test_env(u_boot_console): - '''pytest fixture to provide a StateTestEnv object to tests.''' + """pytest fixture to provide a StateTestEnv object to tests.""" - return StateTestEnv(u_boot_console) + global ste + if not ste: + ste = StateTestEnv(u_boot_console) + return ste def unset_var(state_test_env, var): - '''Unset an environment variable. + """Unset an environment variable. This both executes a U-Boot shell command and updates a StateTestEnv object. Args: - state_test_env: The StateTestEnv object to updata. + state_test_env: The StateTestEnv object to update. var: The variable name to unset. Returns: Nothing. - ''' + """ state_test_env.u_boot_console.run_command('setenv %s' % var) if var in state_test_env.env: del state_test_env.env[var] def set_var(state_test_env, var, value): - '''Set an environment variable. + """Set an environment variable. This both executes a U-Boot shell command and updates a StateTestEnv object. Args: - state_test_env: The StateTestEnv object to updata. + state_test_env: The StateTestEnv object to update. var: The variable name to set. value: The value to set the variable to. Returns: Nothing. - ''' + """ state_test_env.u_boot_console.run_command('setenv %s "%s"' % (var, value)) state_test_env.env[var] = value def validate_empty(state_test_env, var): - '''Validate that a variable is not set, using U-Boot shell commands. + """Validate that a variable is not set, using U-Boot shell commands. Args: var: The variable name to test. Returns: Nothing. - ''' + """ response = state_test_env.u_boot_console.run_command('echo $%s' % var) assert response == '' def validate_set(state_test_env, var, value): - '''Validate that a variable is set, using U-Boot shell commands. + """Validate that a variable is set, using U-Boot shell commands. Args: var: The variable name to test. @@ -141,7 +145,7 @@ def validate_set(state_test_env, var, value): Returns: Nothing. - ''' + """ # echo does not preserve leading, internal, or trailing whitespace in the # value. printenv does, and hence allows more complete testing. @@ -149,20 +153,20 @@ def validate_set(state_test_env, var, value): assert response == ('%s=%s' % (var, value)) def test_env_echo_exists(state_test_env): - '''Test echoing a variable that exists.''' + """Test echoing a variable that exists.""" var = state_test_env.get_existent_var() value = state_test_env.env[var] validate_set(state_test_env, var, value) def test_env_echo_non_existent(state_test_env): - '''Test echoing a variable that doesn't exist.''' + """Test echoing a variable that doesn't exist.""" var = state_test_env.set_var validate_empty(state_test_env, var) def test_env_printenv_non_existent(state_test_env): - '''Test printenv error message for non-existant variables.''' + """Test printenv error message for non-existant variables.""" var = state_test_env.set_var c = state_test_env.u_boot_console @@ -171,14 +175,14 @@ def test_env_printenv_non_existent(state_test_env): assert(response == '## Error: "%s" not defined' % var) def test_env_unset_non_existent(state_test_env): - '''Test unsetting a nonexistent variable.''' + """Test unsetting a nonexistent variable.""" var = state_test_env.get_non_existent_var() unset_var(state_test_env, var) validate_empty(state_test_env, var) def test_env_set_non_existent(state_test_env): - '''Test set a non-existant variable.''' + """Test set a non-existant variable.""" var = state_test_env.set_var value = 'foo' @@ -186,7 +190,7 @@ def test_env_set_non_existent(state_test_env): validate_set(state_test_env, var, value) def test_env_set_existing(state_test_env): - '''Test setting an existant variable.''' + """Test setting an existant variable.""" var = state_test_env.set_var value = 'bar' @@ -194,14 +198,14 @@ def test_env_set_existing(state_test_env): validate_set(state_test_env, var, value) def test_env_unset_existing(state_test_env): - '''Test unsetting a variable.''' + """Test unsetting a variable.""" var = state_test_env.set_var unset_var(state_test_env, var) validate_empty(state_test_env, var) def test_env_expansion_spaces(state_test_env): - '''Test expanding a variable that contains a space in its value.''' + """Test expanding a variable that contains a space in its value.""" var_space = None var_test = None diff --git a/test/py/tests/test_help.py b/test/py/tests/test_help.py index 894f3b5..420090c 100644 --- a/test/py/tests/test_help.py +++ b/test/py/tests/test_help.py @@ -4,6 +4,6 @@ # SPDX-License-Identifier: GPL-2.0 def test_help(u_boot_console): - '''Test that the "help" command can be executed.''' + """Test that the "help" command can be executed.""" u_boot_console.run_command('help') diff --git a/test/py/tests/test_hush_if_test.py b/test/py/tests/test_hush_if_test.py index cf4c3ae..8b88425 100644 --- a/test/py/tests/test_hush_if_test.py +++ b/test/py/tests/test_hush_if_test.py @@ -95,7 +95,7 @@ subtests = ( ) def exec_hush_if(u_boot_console, expr, result): - '''Execute a shell "if" command, and validate its result.''' + """Execute a shell "if" command, and validate its result.""" cmd = 'if ' + expr + '; then echo true; else echo false; fi' response = u_boot_console.run_command(cmd) @@ -103,7 +103,7 @@ def exec_hush_if(u_boot_console, expr, result): @pytest.mark.buildconfigspec('sys_hush_parser') def test_hush_if_test_setup(u_boot_console): - '''Set up environment variables used during the "if" tests.''' + """Set up environment variables used during the "if" tests.""" u_boot_console.run_command('setenv ut_var_nonexistent') u_boot_console.run_command('setenv ut_var_exists 1') @@ -111,13 +111,13 @@ def test_hush_if_test_setup(u_boot_console): @pytest.mark.buildconfigspec('sys_hush_parser') @pytest.mark.parametrize('expr,result', subtests) def test_hush_if_test(u_boot_console, expr, result): - '''Test a single "if test" condition.''' + """Test a single "if test" condition.""" exec_hush_if(u_boot_console, expr, result) @pytest.mark.buildconfigspec('sys_hush_parser') def test_hush_if_test_teardown(u_boot_console): - '''Clean up environment variables used during the "if" tests.''' + """Clean up environment variables used during the "if" tests.""" u_boot_console.run_command('setenv ut_var_exists') @@ -126,7 +126,7 @@ def test_hush_if_test_teardown(u_boot_console): # Of those, only UMS currently allows file removal though. @pytest.mark.boardspec('sandbox') def test_hush_if_test_host_file_exists(u_boot_console): - '''Test the "if test -e" shell command.''' + """Test the "if test -e" shell command.""" test_file = u_boot_console.config.result_dir + \ '/creating_this_file_breaks_u_boot_tests' diff --git a/test/py/tests/test_md.py b/test/py/tests/test_md.py index 94603c7..5fe2582 100644 --- a/test/py/tests/test_md.py +++ b/test/py/tests/test_md.py @@ -4,13 +4,14 @@ # SPDX-License-Identifier: GPL-2.0 import pytest +import u_boot_utils @pytest.mark.buildconfigspec('cmd_memory') def test_md(u_boot_console): - '''Test that md reads memory as expected, and that memory can be modified - using the mw command.''' + """Test that md reads memory as expected, and that memory can be modified + using the mw command.""" - ram_base = u_boot_console.find_ram_base() + ram_base = u_boot_utils.find_ram_base(u_boot_console) addr = '%08x' % ram_base val = 'a5f09876' expected_response = addr + ': ' + val @@ -23,10 +24,10 @@ def test_md(u_boot_console): @pytest.mark.buildconfigspec('cmd_memory') def test_md_repeat(u_boot_console): - '''Test command repeat (via executing an empty command) operates correctly - for "md"; the command must repeat and dump an incrementing address.''' + """Test command repeat (via executing an empty command) operates correctly + for "md"; the command must repeat and dump an incrementing address.""" - ram_base = u_boot_console.find_ram_base() + ram_base = u_boot_utils.find_ram_base(u_boot_console) addr_base = '%08x' % ram_base words = 0x10 addr_repeat = '%08x' % (ram_base + (words * 4)) diff --git a/test/py/tests/test_net.py b/test/py/tests/test_net.py new file mode 100644 index 0000000..07393eb --- /dev/null +++ b/test/py/tests/test_net.py @@ -0,0 +1,155 @@ +# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. +# +# SPDX-License-Identifier: GPL-2.0 + +# Test various network-related functionality, such as the dhcp, ping, and +# tftpboot commands. + +import pytest +import u_boot_utils + +""" +Note: This test relies on boardenv_* containing configuration values to define +which the network environment available for testing. Without this, this test +will be automatically skipped. + +For example: + +# Boolean indicating whether the Ethernet device is attached to USB, and hence +# USB enumeration needs to be performed prior to network tests. +# This variable may be omitted if its value is False. +env__net_uses_usb = False + +# Boolean indicating whether the Ethernet device is attached to PCI, and hence +# PCI enumeration needs to be performed prior to network tests. +# This variable may be omitted if its value is False. +env__net_uses_pci = True + +# True if a DHCP server is attached to the network, and should be tested. +# If DHCP testing is not possible or desired, this variable may be omitted or +# set to False. +env__net_dhcp_server = True + +# A list of environment variables that should be set in order to configure a +# static IP. If solely relying on DHCP, this variable may be omitted or set to +# an empty list. +env__net_static_env_vars = [ + ("ipaddr", "10.0.0.100"), + ("netmask", "255.255.255.0"), + ("serverip", "10.0.0.1"), +] + +# Details regarding a file that may be read from a TFTP server. This variable +# may be omitted or set to None if TFTP testing is not possible or desired. +env__net_tftp_readable_file = { + "fn": "ubtest-readable.bin", + "size": 5058624, + "crc32": "c2244b26", +} +""" + +net_set_up = False + +def test_net_pre_commands(u_boot_console): + """Execute any commands required to enable network hardware. + + These commands are provided by the boardenv_* file; see the comment at the + beginning of this file. + """ + + init_usb = u_boot_console.config.env.get('env__net_uses_usb', False) + if init_usb: + u_boot_console.run_command('usb start') + + init_pci = u_boot_console.config.env.get('env__net_uses_pci', False) + if init_pci: + u_boot_console.run_command('pci enum') + +@pytest.mark.buildconfigspec('cmd_dhcp') +def test_net_dhcp(u_boot_console): + """Test the dhcp command. + + The boardenv_* file may be used to enable/disable this test; see the + comment at the beginning of this file. + """ + + test_dhcp = u_boot_console.config.env.get('env__net_dhcp_server', False) + if not test_dhcp: + pytest.skip('No DHCP server available') + + u_boot_console.run_command('setenv autoload no') + output = u_boot_console.run_command('dhcp') + assert 'DHCP client bound to address ' in output + + global net_set_up + net_set_up = True + +@pytest.mark.buildconfigspec('net') +def test_net_setup_static(u_boot_console): + """Set up a static IP configuration. + + The configuration is provided by the boardenv_* file; see the comment at + the beginning of this file. + """ + + env_vars = u_boot_console.config.env.get('env__net_static_env_vars', None) + if not env_vars: + pytest.skip('No static network configuration is defined') + + for (var, val) in env_vars: + u_boot_console.run_command('setenv %s %s' % (var, val)) + + global net_set_up + net_set_up = True + +@pytest.mark.buildconfigspec('cmd_ping') +def test_net_ping(u_boot_console): + """Test the ping command. + + The $serverip (as set up by either test_net_dhcp or test_net_setup_static) + is pinged. The test validates that the host is alive, as reported by the + ping command's output. + """ + + if not net_set_up: + pytest.skip('Network not initialized') + + output = u_boot_console.run_command('ping $serverip') + assert 'is alive' in output + +@pytest.mark.buildconfigspec('cmd_net') +def test_net_tftpboot(u_boot_console): + """Test the tftpboot command. + + A file is downloaded from the TFTP server, its size and optionally its + CRC32 are validated. + + The details of the file to download are provided by the boardenv_* file; + see the comment at the beginning of this file. + """ + + if not net_set_up: + pytest.skip('Network not initialized') + + f = u_boot_console.config.env.get('env__net_tftp_readable_file', None) + if not f: + pytest.skip('No TFTP readable file to read') + + addr = u_boot_utils.find_ram_base(u_boot_console) + fn = f['fn'] + output = u_boot_console.run_command('tftpboot %x %s' % (addr, fn)) + expected_text = 'Bytes transferred = ' + sz = f.get('size', None) + if sz: + expected_text += '%d' % sz + assert expected_text in output + + expected_crc = f.get('crc32', None) + if not expected_crc: + return + + if u_boot_console.config.buildconfig.get('config_cmd_crc32', 'n') != 'y': + return + + output = u_boot_console.run_command('crc32 %x $filesize' % addr) + assert expected_crc in output diff --git a/test/py/tests/test_sandbox_exit.py b/test/py/tests/test_sandbox_exit.py index 2aa8eb4..d1aa308 100644 --- a/test/py/tests/test_sandbox_exit.py +++ b/test/py/tests/test_sandbox_exit.py @@ -9,16 +9,14 @@ import signal @pytest.mark.boardspec('sandbox') @pytest.mark.buildconfigspec('reset') def test_reset(u_boot_console): - '''Test that the "reset" command exits sandbox process.''' + """Test that the "reset" command exits sandbox process.""" u_boot_console.run_command('reset', wait_for_prompt=False) assert(u_boot_console.validate_exited()) - u_boot_console.ensure_spawned() @pytest.mark.boardspec('sandbox') def test_ctrl_c(u_boot_console): - '''Test that sending SIGINT to sandbox causes it to exit.''' + """Test that sending SIGINT to sandbox causes it to exit.""" u_boot_console.kill(signal.SIGINT) assert(u_boot_console.validate_exited()) - u_boot_console.ensure_spawned() diff --git a/test/py/tests/test_shell_basics.py b/test/py/tests/test_shell_basics.py index 719ce61..702e5e2 100644 --- a/test/py/tests/test_shell_basics.py +++ b/test/py/tests/test_shell_basics.py @@ -5,13 +5,13 @@ # Test basic shell functionality, such as commands separate by semi-colons. def test_shell_execute(u_boot_console): - '''Test any shell command.''' + """Test any shell command.""" response = u_boot_console.run_command('echo hello') assert response.strip() == 'hello' def test_shell_semicolon_two(u_boot_console): - '''Test two shell commands separate by a semi-colon.''' + """Test two shell commands separate by a semi-colon.""" cmd = 'echo hello; echo world' response = u_boot_console.run_command(cmd) @@ -19,8 +19,8 @@ def test_shell_semicolon_two(u_boot_console): assert response.index('hello') < response.index('world') def test_shell_semicolon_three(u_boot_console): - '''Test three shell commands separate by a semi-colon, with variable - expansion dependencies between them.''' + """Test three shell commands separate by a semi-colon, with variable + expansion dependencies between them.""" cmd = 'setenv list 1; setenv list ${list}2; setenv list ${list}3; ' + \ 'echo ${list}' @@ -29,9 +29,9 @@ def test_shell_semicolon_three(u_boot_console): u_boot_console.run_command('setenv list') def test_shell_run(u_boot_console): - '''Test the "run" shell command.''' + """Test the "run" shell command.""" - u_boot_console.run_command('setenv foo \"setenv monty 1; setenv python 2\"') + u_boot_console.run_command('setenv foo "setenv monty 1; setenv python 2"') u_boot_console.run_command('run foo') response = u_boot_console.run_command('echo $monty') assert response.strip() == '1' diff --git a/test/py/tests/test_sleep.py b/test/py/tests/test_sleep.py index 64f1ddf..74add89 100644 --- a/test/py/tests/test_sleep.py +++ b/test/py/tests/test_sleep.py @@ -6,12 +6,8 @@ import pytest import time def test_sleep(u_boot_console): - '''Test the sleep command, and validate that it sleeps for approximately - the correct amount of time.''' - - # Do this before we time anything, to make sure U-Boot is already running. - # Otherwise, the system boot time is included in the time measurement. - u_boot_console.ensure_spawned() + """Test the sleep command, and validate that it sleeps for approximately + the correct amount of time.""" # 3s isn't too long, but is enough to cross a few second boundaries. sleep_time = 3 diff --git a/test/py/tests/test_ums.py b/test/py/tests/test_ums.py index a137221..8c3ee2b 100644 --- a/test/py/tests/test_ums.py +++ b/test/py/tests/test_ums.py @@ -2,28 +2,58 @@ # # SPDX-License-Identifier: GPL-2.0 -# Test U-Boot's "ums" command. At present, this test only ensures that a UMS -# device can be enumerated by the host/test machine. In the future, this test -# should be enhanced to validate disk IO. +# Test U-Boot's "ums" command. The test starts UMS in U-Boot, waits for USB +# device enumeration on the host, reads a small block of data from the UMS +# block device, optionally mounts a partition and performs filesystem-based +# read/write tests, and finally aborts the "ums" command in U-Boot. import os +import os.path import pytest +import re import time +import u_boot_utils -''' +""" Note: This test relies on: a) boardenv_* to contain configuration values to define which USB ports are available for testing. Without this, this test will be automatically skipped. For example: +# Leave this list empty if you have no block_devs below with writable +# partitions defined. +env__mount_points = ( + "/mnt/ubtest-mnt-p2371-2180-na", +) + env__usb_dev_ports = ( - {'tgt_usb_ctlr': '0', 'host_ums_dev_node': '/dev/disk/by-path/pci-0000:00:14.0-usb-0:13:1.0-scsi-0:0:0:0'}, + { + "fixture_id": "micro_b", + "tgt_usb_ctlr": "0", + "host_ums_dev_node": "/dev/disk/by-path/pci-0000:00:14.0-usb-0:13:1.0-scsi-0:0:0:0", + }, ) env__block_devs = ( - {'type': 'mmc', 'id': '0'}, # eMMC; always present - {'type': 'mmc', 'id': '1'}, # SD card; present since I plugged one in + # eMMC; always present + { + "fixture_id": "emmc", + "type": "mmc", + "id": "0", + # The following two properties are optional. + # If present, the partition will be mounted and a file written-to and + # read-from it. If missing, only a simple block read test will be + # performed. + "writable_fs_partition": 1, + "writable_fs_subdir": "tmp/", + }, + # SD card; present since I plugged one in + { + "fixture_id": "sd", + "type": "mmc", + "id": "1" + }, ) b) udev rules to set permissions on devices nodes, so that sudo is not @@ -34,47 +64,42 @@ ACTION=="add", SUBSYSTEM=="block", SUBSYSTEMS=="usb", KERNELS=="3-13", MODE:="66 (You may wish to change the group ID instead of setting the permissions wide open. All that matters is that the user ID running the test can access the device.) -''' -def open_ums_device(host_ums_dev_node): - '''Attempt to open a device node, returning either the opened file handle, - or None on any error.''' +c) /etc/fstab entries to allow the block device to be mounted without requiring +root permissions. For example: - try: - return open(host_ums_dev_node, 'rb') - except: - return None - -def wait_for_ums_device(host_ums_dev_node): - '''Continually attempt to open the device node exported by the "ums" - command, and either return the opened file handle, or raise an exception - after a timeout.''' - - for i in xrange(100): - fh = open_ums_device(host_ums_dev_node) - if fh: - return fh - time.sleep(0.1) - raise Exception('UMS device did not appear') - -def wait_for_ums_device_gone(host_ums_dev_node): - '''Continually attempt to open the device node exported by the "ums" - command, and either return once the device has disappeared, or raise an - exception if it does not before a timeout occurs.''' - - for i in xrange(100): - fh = open_ums_device(host_ums_dev_node) - if not fh: - return - fh.close() - time.sleep(0.1) - raise Exception('UMS device did not disappear') +/dev/disk/by-path/pci-0000:00:14.0-usb-0:13:1.0-scsi-0:0:0:0-part1 /mnt/ubtest-mnt-p2371-2180-na ext4 noauto,user,nosuid,nodev + +This entry is only needed if any block_devs above contain a +writable_fs_partition value. +""" @pytest.mark.buildconfigspec('cmd_usb_mass_storage') def test_ums(u_boot_console, env__usb_dev_port, env__block_devs): - '''Test the "ums" command; the host system must be able to enumerate a UMS - device when "ums" is running, and this device must disappear when "ums" is - aborted.''' + """Test the "ums" command; the host system must be able to enumerate a UMS + device when "ums" is running, block and optionally file I/O are tested, + and this device must disappear when "ums" is aborted. + + Args: + u_boot_console: A U-Boot console connection. + env__usb_dev_port: The single USB device-mode port specification on + which to run the test. See the file-level comment above for + details of the format. + env__block_devs: The list of block devices that the target U-Boot + device has attached. See the file-level comment above for details + of the format. + + Returns: + Nothing. + """ + + have_writable_fs_partition = 'writable_fs_partition' in env__block_devs[0] + if not have_writable_fs_partition: + # If 'writable_fs_subdir' is missing, we'll skip all parts of the + # testing which mount filesystems. + u_boot_console.log.warning( + 'boardenv missing "writable_fs_partition"; ' + + 'UMS testing will be limited.') tgt_usb_ctlr = env__usb_dev_port['tgt_usb_ctlr'] host_ums_dev_node = env__usb_dev_port['host_ums_dev_node'] @@ -84,11 +109,129 @@ def test_ums(u_boot_console, env__usb_dev_port, env__block_devs): # device list here. We'll test each block device somewhere else. tgt_dev_type = env__block_devs[0]['type'] tgt_dev_id = env__block_devs[0]['id'] + if have_writable_fs_partition: + mount_point = u_boot_console.config.env['env__mount_points'][0] + mount_subdir = env__block_devs[0]['writable_fs_subdir'] + part_num = env__block_devs[0]['writable_fs_partition'] + host_ums_part_node = '%s-part%d' % (host_ums_dev_node, part_num) + else: + host_ums_part_node = host_ums_dev_node + + test_f = u_boot_utils.PersistentRandomFile(u_boot_console, 'ums.bin', + 1024 * 1024); + if have_writable_fs_partition: + mounted_test_fn = mount_point + '/' + mount_subdir + test_f.fn + + def start_ums(): + """Start U-Boot's ums shell command. + + This also waits for the host-side USB enumeration process to complete. + + Args: + None. + + Returns: + Nothing. + """ + + u_boot_console.log.action( + 'Starting long-running U-Boot ums shell command') + cmd = 'ums %s %s %s' % (tgt_usb_ctlr, tgt_dev_type, tgt_dev_id) + u_boot_console.run_command(cmd, wait_for_prompt=False) + u_boot_console.wait_for(re.compile('UMS: LUN.*[\r\n]')) + fh = u_boot_utils.wait_until_open_succeeds(host_ums_part_node) + u_boot_console.log.action('Reading raw data from UMS device') + fh.read(4096) + fh.close() + + def mount(): + """Mount the block device that U-Boot exports. + + Args: + None. + + Returns: + Nothing. + """ + + u_boot_console.log.action('Mounting exported UMS device') + cmd = ('/bin/mount', host_ums_part_node) + u_boot_utils.run_and_log(u_boot_console, cmd) + + def umount(ignore_errors): + """Unmount the block device that U-Boot exports. - cmd = 'ums %s %s %s' % (tgt_usb_ctlr, tgt_dev_type, tgt_dev_id) - u_boot_console.run_command('ums 0 mmc 0', wait_for_prompt=False) - fh = wait_for_ums_device(host_ums_dev_node) - fh.read(4096) - fh.close() - u_boot_console.ctrlc() - wait_for_ums_device_gone(host_ums_dev_node) + Args: + ignore_errors: Ignore any errors. This is useful if an error has + already been detected, and the code is performing best-effort + cleanup. In this case, we do not want to mask the original + error by "honoring" any new errors. + + Returns: + Nothing. + """ + + u_boot_console.log.action('Unmounting UMS device') + cmd = ('/bin/umount', host_ums_part_node) + u_boot_utils.run_and_log(u_boot_console, cmd, ignore_errors) + + def stop_ums(ignore_errors): + """Stop U-Boot's ums shell command from executing. + + This also waits for the host-side USB de-enumeration process to + complete. + + Args: + ignore_errors: Ignore any errors. This is useful if an error has + already been detected, and the code is performing best-effort + cleanup. In this case, we do not want to mask the original + error by "honoring" any new errors. + + Returns: + Nothing. + """ + + u_boot_console.log.action( + 'Stopping long-running U-Boot ums shell command') + u_boot_console.ctrlc() + u_boot_utils.wait_until_file_open_fails(host_ums_part_node, + ignore_errors) + + ignore_cleanup_errors = True + try: + start_ums() + if not have_writable_fs_partition: + # Skip filesystem-based testing if not configured + return + try: + mount() + u_boot_console.log.action('Writing test file via UMS') + cmd = ('rm', '-f', mounted_test_fn) + u_boot_utils.run_and_log(u_boot_console, cmd) + if os.path.exists(mounted_test_fn): + raise Exception('Could not rm target UMS test file') + cmd = ('cp', test_f.abs_fn, mounted_test_fn) + u_boot_utils.run_and_log(u_boot_console, cmd) + ignore_cleanup_errors = False + finally: + umount(ignore_errors=ignore_cleanup_errors) + finally: + stop_ums(ignore_errors=ignore_cleanup_errors) + + ignore_cleanup_errors = True + try: + start_ums() + try: + mount() + u_boot_console.log.action('Reading test file back via UMS') + read_back_hash = u_boot_utils.md5sum_file(mounted_test_fn) + cmd = ('rm', '-f', mounted_test_fn) + u_boot_utils.run_and_log(u_boot_console, cmd) + ignore_cleanup_errors = False + finally: + umount(ignore_errors=ignore_cleanup_errors) + finally: + stop_ums(ignore_errors=ignore_cleanup_errors) + + written_hash = test_f.content_hash + assert(written_hash == read_back_hash) diff --git a/test/py/tests/test_unknown_cmd.py b/test/py/tests/test_unknown_cmd.py index 2de93e0..c27ab49 100644 --- a/test/py/tests/test_unknown_cmd.py +++ b/test/py/tests/test_unknown_cmd.py @@ -4,8 +4,8 @@ # SPDX-License-Identifier: GPL-2.0 def test_unknown_command(u_boot_console): - '''Test that executing an unknown command causes U-Boot to print an - error.''' + """Test that executing an unknown command causes U-Boot to print an + error.""" # The "unknown command" error is actively expected here, # so error detection for it is disabled. diff --git a/test/py/u_boot_console_base.py b/test/py/u_boot_console_base.py index 520f9a9..392f8cb 100644 --- a/test/py/u_boot_console_base.py +++ b/test/py/u_boot_console_base.py @@ -14,6 +14,7 @@ import os import pytest import re import sys +import u_boot_spawn # Regexes for text we expect U-Boot to send to the console. pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}-[^\r\n]*)') @@ -21,14 +22,27 @@ pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}-[^\r\n]*)') pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ') pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'') pattern_error_notification = re.compile('## Error: ') +pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###') + +PAT_ID = 0 +PAT_RE = 1 + +bad_pattern_defs = ( + ('spl_signon', pattern_u_boot_spl_signon), + ('main_signon', pattern_u_boot_main_signon), + ('stop_autoboot_prompt', pattern_stop_autoboot_prompt), + ('unknown_command', pattern_unknown_command), + ('error_notification', pattern_error_notification), + ('error_please_reset', pattern_error_please_reset), +) class ConsoleDisableCheck(object): - '''Context manager (for Python's with statement) that temporarily disables + """Context manager (for Python's with statement) that temporarily disables the specified console output error check. This is useful when deliberately executing a command that is known to trigger one of the error checks, in order to test that the error condition is actually raised. This class is used internally by ConsoleBase::disable_check(); it is not intended for - direct usage.''' + direct usage.""" def __init__(self, console, check_type): self.console = console @@ -36,18 +50,20 @@ class ConsoleDisableCheck(object): def __enter__(self): self.console.disable_check_count[self.check_type] += 1 + self.console.eval_bad_patterns() def __exit__(self, extype, value, traceback): self.console.disable_check_count[self.check_type] -= 1 + self.console.eval_bad_patterns() class ConsoleBase(object): - '''The interface through which test functions interact with the U-Boot + """The interface through which test functions interact with the U-Boot console. This primarily involves executing shell commands, capturing their results, and checking for common error conditions. Some common utilities - are also provided too.''' + are also provided too.""" def __init__(self, log, config, max_fifo_fill): - '''Initialize a U-Boot console connection. + """Initialize a U-Boot console connection. Can only usefully be called by sub-classes. @@ -64,7 +80,7 @@ class ConsoleBase(object): Returns: Nothing. - ''' + """ self.log = log self.config = config @@ -76,19 +92,20 @@ class ConsoleBase(object): self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1] self.prompt_escaped = re.escape(self.prompt) self.p = None - self.disable_check_count = { - 'spl_signon': 0, - 'main_signon': 0, - 'unknown_command': 0, - 'error_notification': 0, - } + self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs} + self.eval_bad_patterns() self.at_prompt = False self.at_prompt_logevt = None - self.ram_base = None + + def eval_bad_patterns(self): + self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \ + if self.disable_check_count[pat[PAT_ID]] == 0] + self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \ + if self.disable_check_count[pat[PAT_ID]] == 0] def close(self): - '''Terminate the connection to the U-Boot console. + """Terminate the connection to the U-Boot console. This function is only useful once all interaction with U-Boot is complete. Once this function is called, data cannot be sent to or @@ -99,7 +116,7 @@ class ConsoleBase(object): Returns: Nothing. - ''' + """ if self.p: self.p.close() @@ -107,7 +124,7 @@ class ConsoleBase(object): def run_command(self, cmd, wait_for_echo=True, send_nl=True, wait_for_prompt=True): - '''Execute a command via the U-Boot console. + """Execute a command via the U-Boot console. The command is always sent to U-Boot. @@ -142,29 +159,12 @@ class ConsoleBase(object): The output from U-Boot during command execution. In other words, the text U-Boot emitted between the point it echod the command string and emitted the subsequent command prompts. - ''' - - self.ensure_spawned() + """ if self.at_prompt and \ self.at_prompt_logevt != self.logstream.logfile.cur_evt: self.logstream.write(self.prompt, implicit=True) - bad_patterns = [] - bad_pattern_ids = [] - if (self.disable_check_count['spl_signon'] == 0 and - self.u_boot_spl_signon): - bad_patterns.append(self.u_boot_spl_signon_escaped) - bad_pattern_ids.append('SPL signon') - if self.disable_check_count['main_signon'] == 0: - bad_patterns.append(self.u_boot_main_signon_escaped) - bad_pattern_ids.append('U-Boot main signon') - if self.disable_check_count['unknown_command'] == 0: - bad_patterns.append(pattern_unknown_command) - bad_pattern_ids.append('Unknown command') - if self.disable_check_count['error_notification'] == 0: - bad_patterns.append(pattern_error_notification) - bad_pattern_ids.append('Error notification') try: self.at_prompt = False if send_nl: @@ -178,18 +178,18 @@ class ConsoleBase(object): continue chunk = re.escape(chunk) chunk = chunk.replace('\\\n', '[\r\n]') - m = self.p.expect([chunk] + bad_patterns) + m = self.p.expect([chunk] + self.bad_patterns) if m != 0: self.at_prompt = False raise Exception('Bad pattern found on console: ' + - bad_pattern_ids[m - 1]) + self.bad_pattern_ids[m - 1]) if not wait_for_prompt: return - m = self.p.expect([self.prompt_escaped] + bad_patterns) + m = self.p.expect([self.prompt_escaped] + self.bad_patterns) if m != 0: self.at_prompt = False raise Exception('Bad pattern found on console: ' + - bad_pattern_ids[m - 1]) + self.bad_pattern_ids[m - 1]) self.at_prompt = True self.at_prompt_logevt = self.logstream.logfile.cur_evt # Only strip \r\n; space/TAB might be significant if testing @@ -201,7 +201,7 @@ class ConsoleBase(object): raise def ctrlc(self): - '''Send a CTRL-C character to U-Boot. + """Send a CTRL-C character to U-Boot. This is useful in order to stop execution of long-running synchronous commands such as "ums". @@ -211,12 +211,72 @@ class ConsoleBase(object): Returns: Nothing. - ''' + """ + self.log.action('Sending Ctrl-C') self.run_command(chr(3), wait_for_echo=False, send_nl=False) + def wait_for(self, text): + """Wait for a pattern to be emitted by U-Boot. + + This is useful when a long-running command such as "dfu" is executing, + and it periodically emits some text that should show up at a specific + location in the log file. + + Args: + text: The text to wait for; either a string (containing raw text, + not a regular expression) or an re object. + + Returns: + Nothing. + """ + + if type(text) == type(''): + text = re.escape(text) + m = self.p.expect([text] + self.bad_patterns) + if m != 0: + raise Exception('Bad pattern found on console: ' + + self.bad_pattern_ids[m - 1]) + + def drain_console(self): + """Read from and log the U-Boot console for a short time. + + U-Boot's console output is only logged when the test code actively + waits for U-Boot to emit specific data. There are cases where tests + can fail without doing this. For example, if a test asks U-Boot to + enable USB device mode, then polls until a host-side device node + exists. In such a case, it is useful to log U-Boot's console output + in case U-Boot printed clues as to why the host-side even did not + occur. This function will do that. + + Args: + None. + + Returns: + Nothing. + """ + + # If we are already not connected to U-Boot, there's nothing to drain. + # This should only happen when a previous call to run_command() or + # wait_for() failed (and hence the output has already been logged), or + # the system is shutting down. + if not self.p: + return + + orig_timeout = self.p.timeout + try: + # Drain the log for a relatively short time. + self.p.timeout = 1000 + # Wait for something U-Boot will likely never send. This will + # cause the console output to be read and logged. + self.p.expect(['This should never match U-Boot output']) + except u_boot_spawn.Timeout: + pass + finally: + self.p.timeout = orig_timeout + def ensure_spawned(self): - '''Ensure a connection to a correctly running U-Boot instance. + """Ensure a connection to a correctly running U-Boot instance. This may require spawning a new Sandbox process or resetting target hardware, as defined by the implementation sub-class. @@ -228,7 +288,7 @@ class ConsoleBase(object): Returns: Nothing. - ''' + """ if self.p: return @@ -243,26 +303,30 @@ class ConsoleBase(object): self.p.timeout = 30000 self.p.logfile_read = self.logstream if self.config.buildconfig.get('CONFIG_SPL', False) == 'y': - self.p.expect([pattern_u_boot_spl_signon]) - self.u_boot_spl_signon = self.p.after - self.u_boot_spl_signon_escaped = re.escape(self.p.after) - else: - self.u_boot_spl_signon = None - self.p.expect([pattern_u_boot_main_signon]) - self.u_boot_main_signon = self.p.after - self.u_boot_main_signon_escaped = re.escape(self.p.after) - build_idx = self.u_boot_main_signon.find(', Build:') + m = self.p.expect([pattern_u_boot_spl_signon] + self.bad_patterns) + if m != 0: + raise Exception('Bad pattern found on console: ' + + self.bad_pattern_ids[m - 1]) + m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns) + if m != 0: + raise Exception('Bad pattern found on console: ' + + self.bad_pattern_ids[m - 1]) + signon = self.p.after + build_idx = signon.find(', Build:') if build_idx == -1: - self.u_boot_version_string = self.u_boot_main_signon + self.u_boot_version_string = signon else: - self.u_boot_version_string = self.u_boot_main_signon[:build_idx] + self.u_boot_version_string = signon[:build_idx] while True: - match = self.p.expect([self.prompt_escaped, - pattern_stop_autoboot_prompt]) - if match == 1: + m = self.p.expect([self.prompt_escaped, + pattern_stop_autoboot_prompt] + self.bad_patterns) + if m == 0: + break + if m == 1: self.p.send(chr(3)) # CTRL-C continue - break + raise Exception('Bad pattern found on console: ' + + self.bad_pattern_ids[m - 2]) self.at_prompt = True self.at_prompt_logevt = self.logstream.logfile.cur_evt except Exception as ex: @@ -271,7 +335,7 @@ class ConsoleBase(object): raise def cleanup_spawn(self): - '''Shut down all interaction with the U-Boot instance. + """Shut down all interaction with the U-Boot instance. This is used when an error is detected prior to re-establishing a connection with a fresh U-Boot instance. @@ -283,7 +347,7 @@ class ConsoleBase(object): Returns: Nothing. - ''' + """ try: if self.p: @@ -293,7 +357,7 @@ class ConsoleBase(object): self.p = None def validate_version_string_in_text(self, text): - '''Assert that a command's output includes the U-Boot signon message. + """Assert that a command's output includes the U-Boot signon message. This is primarily useful for validating the "version" command without duplicating the signon text regex in a test function. @@ -303,12 +367,12 @@ class ConsoleBase(object): Returns: Nothing. An exception is raised if the validation fails. - ''' + """ assert(self.u_boot_version_string in text) def disable_check(self, check_type): - '''Temporarily disable an error check of U-Boot's output. + """Temporarily disable an error check of U-Boot's output. Create a new context manager (for use with the "with" statement) which temporarily disables a particular console output error check. @@ -319,42 +383,6 @@ class ConsoleBase(object): Returns: A context manager object. - ''' + """ return ConsoleDisableCheck(self, check_type) - - def find_ram_base(self): - '''Find the running U-Boot's RAM location. - - Probe the running U-Boot to determine the address of the first bank - of RAM. This is useful for tests that test reading/writing RAM, or - load/save files that aren't associated with some standard address - typically represented in an environment variable such as - ${kernel_addr_r}. The value is cached so that it only needs to be - actively read once. - - Args: - None. - - Returns: - The address of U-Boot's first RAM bank, as an integer. - ''' - - if self.config.buildconfig.get('config_cmd_bdi', 'n') != 'y': - pytest.skip('bdinfo command not supported') - if self.ram_base == -1: - pytest.skip('Previously failed to find RAM bank start') - if self.ram_base is not None: - return self.ram_base - - with self.log.section('find_ram_base'): - response = self.run_command('bdinfo') - for l in response.split('\n'): - if '-> start' in l: - self.ram_base = int(l.split('=')[1].strip(), 16) - break - if self.ram_base is None: - self.ram_base = -1 - raise Exception('Failed to find RAM bank start in `bdinfo`') - - return self.ram_base diff --git a/test/py/u_boot_console_exec_attach.py b/test/py/u_boot_console_exec_attach.py index 0ca9e7c..19520cb 100644 --- a/test/py/u_boot_console_exec_attach.py +++ b/test/py/u_boot_console_exec_attach.py @@ -11,15 +11,15 @@ from u_boot_spawn import Spawn from u_boot_console_base import ConsoleBase class ConsoleExecAttach(ConsoleBase): - '''Represents a physical connection to a U-Boot console, typically via a + """Represents a physical connection to a U-Boot console, typically via a serial port. This implementation executes a sub-process to attach to the console, expecting that the stdin/out of the sub-process will be forwarded to/from the physical hardware. This approach isolates the test infra- structure from the user-/installation-specific details of how to - communicate with, and the identity of, serial ports etc.''' + communicate with, and the identity of, serial ports etc.""" def __init__(self, log, config): - '''Initialize a U-Boot console connection. + """Initialize a U-Boot console connection. Args: log: A multiplexed_log.Logfile instance. @@ -27,7 +27,7 @@ class ConsoleExecAttach(ConsoleBase): Returns: Nothing. - ''' + """ # The max_fifo_fill value might need tweaking per-board/-SoC? # 1 would be safe anywhere, but is very slow (a pexpect issue?). @@ -42,7 +42,7 @@ class ConsoleExecAttach(ConsoleBase): runner.close() def get_spawn(self): - '''Connect to a fresh U-Boot instance. + """Connect to a fresh U-Boot instance. The target board is reset, so that U-Boot begins running from scratch. @@ -51,7 +51,7 @@ class ConsoleExecAttach(ConsoleBase): Returns: A u_boot_spawn.Spawn object that is attached to U-Boot. - ''' + """ args = [self.config.board_type, self.config.board_identity] s = Spawn(['u-boot-test-console'] + args) diff --git a/test/py/u_boot_console_sandbox.py b/test/py/u_boot_console_sandbox.py index 88b137e..a7263f3 100644 --- a/test/py/u_boot_console_sandbox.py +++ b/test/py/u_boot_console_sandbox.py @@ -10,11 +10,11 @@ from u_boot_spawn import Spawn from u_boot_console_base import ConsoleBase class ConsoleSandbox(ConsoleBase): - '''Represents a connection to a sandbox U-Boot console, executed as a sub- - process.''' + """Represents a connection to a sandbox U-Boot console, executed as a sub- + process.""" def __init__(self, log, config): - '''Initialize a U-Boot console connection. + """Initialize a U-Boot console connection. Args: log: A multiplexed_log.Logfile instance. @@ -22,12 +22,12 @@ class ConsoleSandbox(ConsoleBase): Returns: Nothing. - ''' + """ super(ConsoleSandbox, self).__init__(log, config, max_fifo_fill=1024) def get_spawn(self): - '''Connect to a fresh U-Boot instance. + """Connect to a fresh U-Boot instance. A new sandbox process is created, so that U-Boot begins running from scratch. @@ -37,26 +37,30 @@ class ConsoleSandbox(ConsoleBase): Returns: A u_boot_spawn.Spawn object that is attached to U-Boot. - ''' + """ - return Spawn([self.config.build_dir + '/u-boot']) + cmd = [ + self.config.build_dir + '/u-boot', + '-d', + self.config.build_dir + '/arch/sandbox/dts/test.dtb' + ] + return Spawn(cmd, cwd=self.config.source_dir) def kill(self, sig): - '''Send a specific Unix signal to the sandbox process. + """Send a specific Unix signal to the sandbox process. Args: sig: The Unix signal to send to the process. Returns: Nothing. - ''' + """ - self.ensure_spawned() self.log.action('kill %d' % sig) self.p.kill(sig) def validate_exited(self): - '''Determine whether the sandbox process has exited. + """Determine whether the sandbox process has exited. If required, this function waits a reasonable time for the process to exit. @@ -66,7 +70,7 @@ class ConsoleSandbox(ConsoleBase): Returns: Boolean indicating whether the process has exited. - ''' + """ p = self.p self.p = None diff --git a/test/py/u_boot_spawn.py b/test/py/u_boot_spawn.py index 1baee63..0f52d3e 100644 --- a/test/py/u_boot_spawn.py +++ b/test/py/u_boot_spawn.py @@ -12,23 +12,25 @@ import select import time class Timeout(Exception): - '''An exception sub-class that indicates that a timeout occurred.''' + """An exception sub-class that indicates that a timeout occurred.""" pass class Spawn(object): - '''Represents the stdio of a freshly created sub-process. Commands may be + """Represents the stdio of a freshly created sub-process. Commands may be sent to the process, and responses waited for. - ''' + """ - def __init__(self, args): - '''Spawn (fork/exec) the sub-process. + def __init__(self, args, cwd=None): + """Spawn (fork/exec) the sub-process. Args: - args: array of processs arguments. argv[0] is the command to execute. + args: array of processs arguments. argv[0] is the command to + execute. + cwd: the directory to run the process in, or None for no change. Returns: Nothing. - ''' + """ self.waited = False self.buf = '' @@ -44,6 +46,8 @@ class Spawn(object): # run under "go" (www.go.cd). Perhaps this happens under any # background (non-interactive) system? signal.signal(signal.SIGHUP, signal.SIG_DFL) + if cwd: + os.chdir(cwd) os.execvp(args[0], args) except: print 'CHILD EXECEPTION:' @@ -56,26 +60,26 @@ class Spawn(object): self.poll.register(self.fd, select.POLLIN | select.POLLPRI | select.POLLERR | select.POLLHUP | select.POLLNVAL) def kill(self, sig): - '''Send unix signal "sig" to the child process. + """Send unix signal "sig" to the child process. Args: sig: The signal number to send. Returns: Nothing. - ''' + """ os.kill(self.pid, sig) def isalive(self): - '''Determine whether the child process is still running. + """Determine whether the child process is still running. Args: None. Returns: Boolean indicating whether process is alive. - ''' + """ if self.waited: return False @@ -88,19 +92,19 @@ class Spawn(object): return False def send(self, data): - '''Send data to the sub-process's stdin. + """Send data to the sub-process's stdin. Args: data: The data to send to the process. Returns: Nothing. - ''' + """ os.write(self.fd, data) def expect(self, patterns): - '''Wait for the sub-process to emit specific data. + """Wait for the sub-process to emit specific data. This function waits for the process to emit one pattern from the supplied list of patterns, or for a timeout to occur. @@ -116,12 +120,13 @@ class Spawn(object): Notable exceptions: Timeout, if the process did not emit any of the patterns within the expected time. - ''' + """ for pi in xrange(len(patterns)): if type(patterns[pi]) == type(''): patterns[pi] = re.compile(patterns[pi]) + tstart_s = time.time() try: while True: earliest_m = None @@ -131,7 +136,7 @@ class Spawn(object): m = pattern.search(self.buf) if not m: continue - if earliest_m and m.start() > earliest_m.start(): + if earliest_m and m.start() >= earliest_m.start(): continue earliest_m = m earliest_pi = pi @@ -142,7 +147,11 @@ class Spawn(object): self.after = self.buf[pos:posafter] self.buf = self.buf[posafter:] return earliest_pi - events = self.poll.poll(self.timeout) + tnow_s = time.time() + tdelta_ms = (tnow_s - tstart_s) * 1000 + if tdelta_ms > self.timeout: + raise Timeout() + events = self.poll.poll(self.timeout - tdelta_ms) if not events: raise Timeout() c = os.read(self.fd, 1024) @@ -156,7 +165,7 @@ class Spawn(object): self.logfile_read.flush() def close(self): - '''Close the stdio connection to the sub-process. + """Close the stdio connection to the sub-process. This also waits a reasonable time for the sub-process to stop running. @@ -165,7 +174,7 @@ class Spawn(object): Returns: Nothing. - ''' + """ os.close(self.fd) for i in xrange(100): diff --git a/test/py/u_boot_utils.py b/test/py/u_boot_utils.py new file mode 100644 index 0000000..72d24e4 --- /dev/null +++ b/test/py/u_boot_utils.py @@ -0,0 +1,209 @@ +# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. +# +# SPDX-License-Identifier: GPL-2.0 + +# Utility code shared across multiple tests. + +import hashlib +import os +import os.path +import sys +import time + +def md5sum_data(data): + """Calculate the MD5 hash of some data. + + Args: + data: The data to hash. + + Returns: + The hash of the data, as a binary string. + """ + + h = hashlib.md5() + h.update(data) + return h.digest() + +def md5sum_file(fn, max_length=None): + """Calculate the MD5 hash of the contents of a file. + + Args: + fn: The filename of the file to hash. + max_length: The number of bytes to hash. If the file has more + bytes than this, they will be ignored. If None or omitted, the + entire file will be hashed. + + Returns: + The hash of the file content, as a binary string. + """ + + with open(fn, 'rb') as fh: + if max_length: + params = [max_length] + else: + params = [] + data = fh.read(*params) + return md5sum_data(data) + +class PersistentRandomFile(object): + """Generate and store information about a persistent file containing + random data.""" + + def __init__(self, u_boot_console, fn, size): + """Create or process the persistent file. + + If the file does not exist, it is generated. + + If the file does exist, its content is hashed for later comparison. + + These files are always located in the "persistent data directory" of + the current test run. + + Args: + u_boot_console: A console connection to U-Boot. + fn: The filename (without path) to create. + size: The desired size of the file in bytes. + + Returns: + Nothing. + """ + + self.fn = fn + + self.abs_fn = u_boot_console.config.persistent_data_dir + '/' + fn + + if os.path.exists(self.abs_fn): + u_boot_console.log.action('Persistent data file ' + self.abs_fn + + ' already exists') + self.content_hash = md5sum_file(self.abs_fn) + else: + u_boot_console.log.action('Generating ' + self.abs_fn + + ' (random, persistent, %d bytes)' % size) + data = os.urandom(size) + with open(self.abs_fn, 'wb') as fh: + fh.write(data) + self.content_hash = md5sum_data(data) + +def attempt_to_open_file(fn): + """Attempt to open a file, without throwing exceptions. + + Any errors (exceptions) that occur during the attempt to open the file + are ignored. This is useful in order to test whether a file (in + particular, a device node) exists and can be successfully opened, in order + to poll for e.g. USB enumeration completion. + + Args: + fn: The filename to attempt to open. + + Returns: + An open file handle to the file, or None if the file could not be + opened. + """ + + try: + return open(fn, 'rb') + except: + return None + +def wait_until_open_succeeds(fn): + """Poll until a file can be opened, or a timeout occurs. + + Continually attempt to open a file, and return when this succeeds, or + raise an exception after a timeout. + + Args: + fn: The filename to attempt to open. + + Returns: + An open file handle to the file. + """ + + for i in xrange(100): + fh = attempt_to_open_file(fn) + if fh: + return fh + time.sleep(0.1) + raise Exception('File could not be opened') + +def wait_until_file_open_fails(fn, ignore_errors): + """Poll until a file cannot be opened, or a timeout occurs. + + Continually attempt to open a file, and return when this fails, or + raise an exception after a timeout. + + Args: + fn: The filename to attempt to open. + ignore_errors: Indicate whether to ignore timeout errors. If True, the + function will simply return if a timeout occurs, otherwise an + exception will be raised. + + Returns: + Nothing. + """ + + for i in xrange(100): + fh = attempt_to_open_file(fn) + if not fh: + return + fh.close() + time.sleep(0.1) + if ignore_errors: + return + raise Exception('File can still be opened') + +def run_and_log(u_boot_console, cmd, ignore_errors=False): + """Run a command and log its output. + + Args: + u_boot_console: A console connection to U-Boot. + cmd: The command to run, as an array of argv[]. + ignore_errors: Indicate whether to ignore errors. If True, the function + will simply return if the command cannot be executed or exits with + an error code, otherwise an exception will be raised if such + problems occur. + + Returns: + Nothing. + """ + + runner = u_boot_console.log.get_runner(cmd[0], sys.stdout) + runner.run(cmd, ignore_errors=ignore_errors) + runner.close() + +ram_base = None +def find_ram_base(u_boot_console): + """Find the running U-Boot's RAM location. + + Probe the running U-Boot to determine the address of the first bank + of RAM. This is useful for tests that test reading/writing RAM, or + load/save files that aren't associated with some standard address + typically represented in an environment variable such as + ${kernel_addr_r}. The value is cached so that it only needs to be + actively read once. + + Args: + u_boot_console: A console connection to U-Boot. + + Returns: + The address of U-Boot's first RAM bank, as an integer. + """ + + global ram_base + if u_boot_console.config.buildconfig.get('config_cmd_bdi', 'n') != 'y': + pytest.skip('bdinfo command not supported') + if ram_base == -1: + pytest.skip('Previously failed to find RAM bank start') + if ram_base is not None: + return ram_base + + with u_boot_console.log.section('find_ram_base'): + response = u_boot_console.run_command('bdinfo') + for l in response.split('\n'): + if '-> start' in l: + ram_base = int(l.split('=')[1].strip(), 16) + break + if ram_base is None: + ram_base = -1 + raise Exception('Failed to find RAM bank start in `bdinfo`') + + return ram_base |