summaryrefslogtreecommitdiff
path: root/test/py
diff options
context:
space:
mode:
Diffstat (limited to 'test/py')
-rw-r--r--test/py/conftest.py118
-rw-r--r--test/py/multiplexed_log.css8
-rw-r--r--test/py/multiplexed_log.py228
-rwxr-xr-xtest/py/test.py8
-rw-r--r--test/py/tests/test_000_version.py2
-rw-r--r--test/py/tests/test_dfu.py279
-rw-r--r--test/py/tests/test_env.py66
-rw-r--r--test/py/tests/test_help.py2
-rw-r--r--test/py/tests/test_hush_if_test.py10
-rw-r--r--test/py/tests/test_md.py13
-rw-r--r--test/py/tests/test_net.py155
-rw-r--r--test/py/tests/test_sandbox_exit.py6
-rw-r--r--test/py/tests/test_shell_basics.py12
-rw-r--r--test/py/tests/test_sleep.py8
-rw-r--r--test/py/tests/test_ums.py243
-rw-r--r--test/py/tests/test_unknown_cmd.py4
-rw-r--r--test/py/u_boot_console_base.py226
-rw-r--r--test/py/u_boot_console_exec_attach.py12
-rw-r--r--test/py/u_boot_console_sandbox.py28
-rw-r--r--test/py/u_boot_spawn.py47
-rw-r--r--test/py/u_boot_utils.py209
21 files changed, 1286 insertions, 398 deletions
diff --git a/test/py/conftest.py b/test/py/conftest.py
index e1674df..3e162ca 100644
--- a/test/py/conftest.py
+++ b/test/py/conftest.py
@@ -29,7 +29,7 @@ log = None
console = None
def mkdir_p(path):
- '''Create a directory path.
+ """Create a directory path.
This includes creating any intermediate/parent directories. Any errors
caused due to already extant directories are ignored.
@@ -39,7 +39,7 @@ def mkdir_p(path):
Returns:
Nothing.
- '''
+ """
try:
os.makedirs(path)
@@ -50,14 +50,14 @@ def mkdir_p(path):
raise
def pytest_addoption(parser):
- '''pytest hook: Add custom command-line options to the cmdline parser.
+ """pytest hook: Add custom command-line options to the cmdline parser.
Args:
parser: The pytest command-line parser.
Returns:
Nothing.
- '''
+ """
parser.addoption('--build-dir', default=None,
help='U-Boot build directory (O=)')
@@ -73,14 +73,14 @@ def pytest_addoption(parser):
help='Compile U-Boot before running tests')
def pytest_configure(config):
- '''pytest hook: Perform custom initialization at startup time.
+ """pytest hook: Perform custom initialization at startup time.
Args:
config: The pytest configuration.
Returns:
Nothing.
- '''
+ """
global log
global console
@@ -190,7 +190,7 @@ def pytest_configure(config):
console = u_boot_console_exec_attach.ConsoleExecAttach(log, ubconfig)
def pytest_generate_tests(metafunc):
- '''pytest hook: parameterize test functions based on custom rules.
+ """pytest hook: parameterize test functions based on custom rules.
If a test function takes parameter(s) (fixture names) of the form brd__xxx
or env__xxx, the brd and env configuration dictionaries are consulted to
@@ -202,7 +202,7 @@ def pytest_generate_tests(metafunc):
Returns:
Nothing.
- '''
+ """
subconfigs = {
'brd': console.config.brd,
@@ -225,28 +225,37 @@ def pytest_generate_tests(metafunc):
# ... otherwise, see if there's a key that contains a list of
# values to use instead.
vals = subconfig.get(fn + 's', [])
- metafunc.parametrize(fn, vals)
-
-@pytest.fixture(scope='session')
+ def fixture_id(index, val):
+ try:
+ return val["fixture_id"]
+ except:
+ return fn + str(index)
+ ids = [fixture_id(index, val) for (index, val) in enumerate(vals)]
+ metafunc.parametrize(fn, vals, ids=ids)
+
+@pytest.fixture(scope='function')
def u_boot_console(request):
- '''Generate the value of a test's u_boot_console fixture.
+ """Generate the value of a test's u_boot_console fixture.
Args:
request: The pytest request.
Returns:
The fixture value.
- '''
+ """
+ console.ensure_spawned()
return console
tests_not_run = set()
tests_failed = set()
+tests_xpassed = set()
+tests_xfailed = set()
tests_skipped = set()
tests_passed = set()
def pytest_itemcollected(item):
- '''pytest hook: Called once for each test found during collection.
+ """pytest hook: Called once for each test found during collection.
This enables our custom result analysis code to see the list of all tests
that should eventually be run.
@@ -256,12 +265,12 @@ def pytest_itemcollected(item):
Returns:
Nothing.
- '''
+ """
tests_not_run.add(item.name)
def cleanup():
- '''Clean up all global state.
+ """Clean up all global state.
Executed (via atexit) once the entire test process is complete. This
includes logging the status of all tests, and the identity of any failed
@@ -272,7 +281,7 @@ def cleanup():
Returns:
Nothing.
- '''
+ """
if console:
console.close()
@@ -282,6 +291,14 @@ def cleanup():
log.status_skipped('%d skipped' % len(tests_skipped))
for test in tests_skipped:
log.status_skipped('... ' + test)
+ if tests_xpassed:
+ log.status_xpass('%d xpass' % len(tests_xpassed))
+ for test in tests_xpassed:
+ log.status_xpass('... ' + test)
+ if tests_xfailed:
+ log.status_xfail('%d xfail' % len(tests_xfailed))
+ for test in tests_xfailed:
+ log.status_xfail('... ' + test)
if tests_failed:
log.status_fail('%d failed' % len(tests_failed))
for test in tests_failed:
@@ -294,7 +311,7 @@ def cleanup():
atexit.register(cleanup)
def setup_boardspec(item):
- '''Process any 'boardspec' marker for a test.
+ """Process any 'boardspec' marker for a test.
Such a marker lists the set of board types that a test does/doesn't
support. If tests are being executed on an unsupported board, the test is
@@ -305,7 +322,7 @@ def setup_boardspec(item):
Returns:
Nothing.
- '''
+ """
mark = item.get_marker('boardspec')
if not mark:
@@ -322,7 +339,7 @@ def setup_boardspec(item):
pytest.skip('board not supported')
def setup_buildconfigspec(item):
- '''Process any 'buildconfigspec' marker for a test.
+ """Process any 'buildconfigspec' marker for a test.
Such a marker lists some U-Boot configuration feature that the test
requires. If tests are being executed on an U-Boot build that doesn't
@@ -333,7 +350,7 @@ def setup_buildconfigspec(item):
Returns:
Nothing.
- '''
+ """
mark = item.get_marker('buildconfigspec')
if not mark:
@@ -343,7 +360,7 @@ def setup_buildconfigspec(item):
pytest.skip('.config feature not enabled')
def pytest_runtest_setup(item):
- '''pytest hook: Configure (set up) a test item.
+ """pytest hook: Configure (set up) a test item.
Called once for each test to perform any custom configuration. This hook
is used to skip the test if certain conditions apply.
@@ -353,14 +370,14 @@ def pytest_runtest_setup(item):
Returns:
Nothing.
- '''
+ """
log.start_section(item.name)
setup_boardspec(item)
setup_buildconfigspec(item)
def pytest_runtest_protocol(item, nextitem):
- '''pytest hook: Called to execute a test.
+ """pytest hook: Called to execute a test.
This hook wraps the standard pytest runtestprotocol() function in order
to acquire visibility into, and record, each test function's result.
@@ -371,36 +388,45 @@ def pytest_runtest_protocol(item, nextitem):
Returns:
A list of pytest reports (test result data).
- '''
+ """
reports = runtestprotocol(item, nextitem=nextitem)
- failed = None
- skipped = None
+
+ failure_cleanup = False
+ test_list = tests_passed
+ msg = 'OK'
+ msg_log = log.status_pass
for report in reports:
if report.outcome == 'failed':
- failed = report
+ if hasattr(report, 'wasxfail'):
+ test_list = tests_xpassed
+ msg = 'XPASSED'
+ msg_log = log.status_xpass
+ else:
+ failure_cleanup = True
+ test_list = tests_failed
+ msg = 'FAILED:\n' + str(report.longrepr)
+ msg_log = log.status_fail
break
if report.outcome == 'skipped':
- if not skipped:
- skipped = report
-
- if failed:
- tests_failed.add(item.name)
- elif skipped:
- tests_skipped.add(item.name)
- else:
- tests_passed.add(item.name)
+ if hasattr(report, 'wasxfail'):
+ failure_cleanup = True
+ test_list = tests_xfailed
+ msg = 'XFAILED:\n' + str(report.longrepr)
+ msg_log = log.status_xfail
+ break
+ test_list = tests_skipped
+ msg = 'SKIPPED:\n' + str(report.longrepr)
+ msg_log = log.status_skipped
+
+ if failure_cleanup:
+ console.drain_console()
+
+ test_list.add(item.name)
tests_not_run.remove(item.name)
try:
- if failed:
- msg = 'FAILED:\n' + str(failed.longrepr)
- log.status_fail(msg)
- elif skipped:
- msg = 'SKIPPED:\n' + str(skipped.longrepr)
- log.status_skipped(msg)
- else:
- log.status_pass('OK')
+ msg_log(msg)
except:
# If something went wrong with logging, it's better to let the test
# process continue, which may report other exceptions that triggered
@@ -416,7 +442,7 @@ def pytest_runtest_protocol(item, nextitem):
log.end_section(item.name)
- if failed:
+ if failure_cleanup:
console.cleanup_spawn()
return reports
diff --git a/test/py/multiplexed_log.css b/test/py/multiplexed_log.css
index 50f7b90..f6240d5 100644
--- a/test/py/multiplexed_log.css
+++ b/test/py/multiplexed_log.css
@@ -83,6 +83,14 @@ pre {
color: #ffff00
}
+.status-xfail {
+ color: #ff7f00
+}
+
+.status-xpass {
+ color: #ff7f00
+}
+
.status-fail {
color: #ff0000
}
diff --git a/test/py/multiplexed_log.py b/test/py/multiplexed_log.py
index 48f2b51..69a577e 100644
--- a/test/py/multiplexed_log.py
+++ b/test/py/multiplexed_log.py
@@ -14,12 +14,12 @@ import subprocess
mod_dir = os.path.dirname(os.path.abspath(__file__))
class LogfileStream(object):
- '''A file-like object used to write a single logical stream of data into
+ """A file-like object used to write a single logical stream of data into
a multiplexed log file. Objects of this type should be created by factory
- functions in the Logfile class rather than directly.'''
+ functions in the Logfile class rather than directly."""
def __init__(self, logfile, name, chained_file):
- '''Initialize a new object.
+ """Initialize a new object.
Args:
logfile: The Logfile object to log to.
@@ -29,26 +29,26 @@ class LogfileStream(object):
Returns:
Nothing.
- '''
+ """
self.logfile = logfile
self.name = name
self.chained_file = chained_file
def close(self):
- '''Dummy function so that this class is "file-like".
+ """Dummy function so that this class is "file-like".
Args:
None.
Returns:
Nothing.
- '''
+ """
pass
def write(self, data, implicit=False):
- '''Write data to the log stream.
+ """Write data to the log stream.
Args:
data: The data to write tot he file.
@@ -60,33 +60,33 @@ class LogfileStream(object):
Returns:
Nothing.
- '''
+ """
self.logfile.write(self, data, implicit)
if self.chained_file:
self.chained_file.write(data)
def flush(self):
- '''Flush the log stream, to ensure correct log interleaving.
+ """Flush the log stream, to ensure correct log interleaving.
Args:
None.
Returns:
Nothing.
- '''
+ """
self.logfile.flush()
if self.chained_file:
self.chained_file.flush()
class RunAndLog(object):
- '''A utility object used to execute sub-processes and log their output to
+ """A utility object used to execute sub-processes and log their output to
a multiplexed log file. Objects of this type should be created by factory
- functions in the Logfile class rather than directly.'''
+ functions in the Logfile class rather than directly."""
def __init__(self, logfile, name, chained_file):
- '''Initialize a new object.
+ """Initialize a new object.
Args:
logfile: The Logfile object to log to.
@@ -96,29 +96,33 @@ class RunAndLog(object):
Returns:
Nothing.
- '''
+ """
self.logfile = logfile
self.name = name
self.chained_file = chained_file
def close(self):
- '''Clean up any resources managed by this object.'''
+ """Clean up any resources managed by this object."""
pass
- def run(self, cmd, cwd=None):
- '''Run a command as a sub-process, and log the results.
+ def run(self, cmd, cwd=None, ignore_errors=False):
+ """Run a command as a sub-process, and log the results.
Args:
cmd: The command to execute.
cwd: The directory to run the command in. Can be None to use the
current directory.
+ ignore_errors: Indicate whether to ignore errors. If True, the
+ function will simply return if the command cannot be executed
+ or exits with an error code, otherwise an exception will be
+ raised if such problems occur.
Returns:
Nothing.
- '''
+ """
- msg = "+" + " ".join(cmd) + "\n"
+ msg = '+' + ' '.join(cmd) + '\n'
if self.chained_file:
self.chained_file.write(msg)
self.logfile.write(self, msg)
@@ -148,7 +152,7 @@ class RunAndLog(object):
exception = e
if output and not output.endswith('\n'):
output += '\n'
- if exit_status and not exception:
+ if exit_status and not exception and not ignore_errors:
exception = Exception('Exit code: ' + str(exit_status))
if exception:
output += str(exception) + '\n'
@@ -159,13 +163,13 @@ class RunAndLog(object):
raise exception
class SectionCtxMgr(object):
- '''A context manager for Python's "with" statement, which allows a certain
+ """A context manager for Python's "with" statement, which allows a certain
portion of test code to be logged to a separate section of the log file.
Objects of this type should be created by factory functions in the Logfile
- class rather than directly.'''
+ class rather than directly."""
def __init__(self, log, marker):
- '''Initialize a new object.
+ """Initialize a new object.
Args:
log: The Logfile object to log to.
@@ -173,7 +177,7 @@ class SectionCtxMgr(object):
Returns:
Nothing.
- '''
+ """
self.log = log
self.marker = marker
@@ -185,35 +189,35 @@ class SectionCtxMgr(object):
self.log.end_section(self.marker)
class Logfile(object):
- '''Generates an HTML-formatted log file containing multiple streams of
- data, each represented in a well-delineated/-structured fashion.'''
+ """Generates an HTML-formatted log file containing multiple streams of
+ data, each represented in a well-delineated/-structured fashion."""
def __init__(self, fn):
- '''Initialize a new object.
+ """Initialize a new object.
Args:
fn: The filename to write to.
Returns:
Nothing.
- '''
+ """
- self.f = open(fn, "wt")
+ self.f = open(fn, 'wt')
self.last_stream = None
self.blocks = []
self.cur_evt = 1
- shutil.copy(mod_dir + "/multiplexed_log.css", os.path.dirname(fn))
- self.f.write("""\
+ shutil.copy(mod_dir + '/multiplexed_log.css', os.path.dirname(fn))
+ self.f.write('''\
<html>
<head>
<link rel="stylesheet" type="text/css" href="multiplexed_log.css">
</head>
<body>
<tt>
-""")
+''')
def close(self):
- '''Close the log file.
+ """Close the log file.
After calling this function, no more data may be written to the log.
@@ -222,22 +226,22 @@ class Logfile(object):
Returns:
Nothing.
- '''
+ """
- self.f.write("""\
+ self.f.write('''\
</tt>
</body>
</html>
-""")
+''')
self.f.close()
# The set of characters that should be represented as hexadecimal codes in
# the log file.
- _nonprint = ("%" + "".join(chr(c) for c in range(0, 32) if c not in (9, 10)) +
- "".join(chr(c) for c in range(127, 256)))
+ _nonprint = ('%' + ''.join(chr(c) for c in range(0, 32) if c not in (9, 10)) +
+ ''.join(chr(c) for c in range(127, 256)))
def _escape(self, data):
- '''Render data format suitable for inclusion in an HTML document.
+ """Render data format suitable for inclusion in an HTML document.
This includes HTML-escaping certain characters, and translating
control characters to a hexadecimal representation.
@@ -247,36 +251,36 @@ class Logfile(object):
Returns:
An escaped version of the data.
- '''
+ """
- data = data.replace(chr(13), "")
- data = "".join((c in self._nonprint) and ("%%%02x" % ord(c)) or
+ data = data.replace(chr(13), '')
+ data = ''.join((c in self._nonprint) and ('%%%02x' % ord(c)) or
c for c in data)
data = cgi.escape(data)
return data
def _terminate_stream(self):
- '''Write HTML to the log file to terminate the current stream's data.
+ """Write HTML to the log file to terminate the current stream's data.
Args:
None.
Returns:
Nothing.
- '''
+ """
self.cur_evt += 1
if not self.last_stream:
return
- self.f.write("</pre>\n")
- self.f.write("<div class=\"stream-trailer\" id=\"" +
- self.last_stream.name + "\">End stream: " +
- self.last_stream.name + "</div>\n")
- self.f.write("</div>\n")
+ self.f.write('</pre>\n')
+ self.f.write('<div class="stream-trailer" id="' +
+ self.last_stream.name + '">End stream: ' +
+ self.last_stream.name + '</div>\n')
+ self.f.write('</div>\n')
self.last_stream = None
def _note(self, note_type, msg):
- '''Write a note or one-off message to the log file.
+ """Write a note or one-off message to the log file.
Args:
note_type: The type of note. This must be a value supported by the
@@ -285,32 +289,32 @@ class Logfile(object):
Returns:
Nothing.
- '''
+ """
self._terminate_stream()
- self.f.write("<div class=\"" + note_type + "\">\n<pre>")
+ self.f.write('<div class="' + note_type + '">\n<pre>')
self.f.write(self._escape(msg))
- self.f.write("\n</pre></div>\n")
+ self.f.write('\n</pre></div>\n')
def start_section(self, marker):
- '''Begin a new nested section in the log file.
+ """Begin a new nested section in the log file.
Args:
marker: The name of the section that is starting.
Returns:
Nothing.
- '''
+ """
self._terminate_stream()
self.blocks.append(marker)
- blk_path = "/".join(self.blocks)
- self.f.write("<div class=\"section\" id=\"" + blk_path + "\">\n")
- self.f.write("<div class=\"section-header\" id=\"" + blk_path +
- "\">Section: " + blk_path + "</div>\n")
+ blk_path = '/'.join(self.blocks)
+ self.f.write('<div class="section" id="' + blk_path + '">\n')
+ self.f.write('<div class="section-header" id="' + blk_path +
+ '">Section: ' + blk_path + '</div>\n')
def end_section(self, marker):
- '''Terminate the current nested section in the log file.
+ """Terminate the current nested section in the log file.
This function validates proper nesting of start_section() and
end_section() calls. If a mismatch is found, an exception is raised.
@@ -320,20 +324,20 @@ class Logfile(object):
Returns:
Nothing.
- '''
+ """
if (not self.blocks) or (marker != self.blocks[-1]):
- raise Exception("Block nesting mismatch: \"%s\" \"%s\"" %
- (marker, "/".join(self.blocks)))
+ raise Exception('Block nesting mismatch: "%s" "%s"' %
+ (marker, '/'.join(self.blocks)))
self._terminate_stream()
- blk_path = "/".join(self.blocks)
- self.f.write("<div class=\"section-trailer\" id=\"section-trailer-" +
- blk_path + "\">End section: " + blk_path + "</div>\n")
- self.f.write("</div>\n")
+ blk_path = '/'.join(self.blocks)
+ self.f.write('<div class="section-trailer" id="section-trailer-' +
+ blk_path + '">End section: ' + blk_path + '</div>\n')
+ self.f.write('</div>\n')
self.blocks.pop()
def section(self, marker):
- '''Create a temporary section in the log file.
+ """Create a temporary section in the log file.
This function creates a context manager for Python's "with" statement,
which allows a certain portion of test code to be logged to a separate
@@ -348,96 +352,120 @@ class Logfile(object):
Returns:
A context manager object.
- '''
+ """
return SectionCtxMgr(self, marker)
def error(self, msg):
- '''Write an error note to the log file.
+ """Write an error note to the log file.
Args:
msg: A message describing the error.
Returns:
Nothing.
- '''
+ """
self._note("error", msg)
def warning(self, msg):
- '''Write an warning note to the log file.
+ """Write an warning note to the log file.
Args:
msg: A message describing the warning.
Returns:
Nothing.
- '''
+ """
self._note("warning", msg)
def info(self, msg):
- '''Write an informational note to the log file.
+ """Write an informational note to the log file.
Args:
msg: An informational message.
Returns:
Nothing.
- '''
+ """
self._note("info", msg)
def action(self, msg):
- '''Write an action note to the log file.
+ """Write an action note to the log file.
Args:
msg: A message describing the action that is being logged.
Returns:
Nothing.
- '''
+ """
self._note("action", msg)
def status_pass(self, msg):
- '''Write a note to the log file describing test(s) which passed.
+ """Write a note to the log file describing test(s) which passed.
Args:
- msg: A message describing passed test(s).
+ msg: A message describing the passed test(s).
Returns:
Nothing.
- '''
+ """
self._note("status-pass", msg)
def status_skipped(self, msg):
- '''Write a note to the log file describing skipped test(s).
+ """Write a note to the log file describing skipped test(s).
Args:
- msg: A message describing passed test(s).
+ msg: A message describing the skipped test(s).
Returns:
Nothing.
- '''
+ """
self._note("status-skipped", msg)
+ def status_xfail(self, msg):
+ """Write a note to the log file describing xfailed test(s).
+
+ Args:
+ msg: A message describing the xfailed test(s).
+
+ Returns:
+ Nothing.
+ """
+
+ self._note("status-xfail", msg)
+
+ def status_xpass(self, msg):
+ """Write a note to the log file describing xpassed test(s).
+
+ Args:
+ msg: A message describing the xpassed test(s).
+
+ Returns:
+ Nothing.
+ """
+
+ self._note("status-xpass", msg)
+
def status_fail(self, msg):
- '''Write a note to the log file describing failed test(s).
+ """Write a note to the log file describing failed test(s).
Args:
- msg: A message describing passed test(s).
+ msg: A message describing the failed test(s).
Returns:
Nothing.
- '''
+ """
self._note("status-fail", msg)
def get_stream(self, name, chained_file=None):
- '''Create an object to log a single stream's data into the log file.
+ """Create an object to log a single stream's data into the log file.
This creates a "file-like" object that can be written to in order to
write a single stream's data to the log file. The implementation will
@@ -452,12 +480,12 @@ class Logfile(object):
Returns:
A file-like object.
- '''
+ """
return LogfileStream(self, name, chained_file)
def get_runner(self, name, chained_file=None):
- '''Create an object that executes processes and logs their output.
+ """Create an object that executes processes and logs their output.
Args:
name: The name of this sub-process.
@@ -466,12 +494,12 @@ class Logfile(object):
Returns:
A RunAndLog object.
- '''
+ """
return RunAndLog(self, name, chained_file)
def write(self, stream, data, implicit=False):
- '''Write stream data into the log file.
+ """Write stream data into the log file.
This function should only be used by instances of LogfileStream or
RunAndLog.
@@ -487,29 +515,29 @@ class Logfile(object):
Returns:
Nothing.
- '''
+ """
if stream != self.last_stream:
self._terminate_stream()
- self.f.write("<div class=\"stream\" id=\"%s\">\n" % stream.name)
- self.f.write("<div class=\"stream-header\" id=\"" + stream.name +
- "\">Stream: " + stream.name + "</div>\n")
- self.f.write("<pre>")
+ self.f.write('<div class="stream" id="%s">\n' % stream.name)
+ self.f.write('<div class="stream-header" id="' + stream.name +
+ '">Stream: ' + stream.name + '</div>\n')
+ self.f.write('<pre>')
if implicit:
- self.f.write("<span class=\"implicit\">")
+ self.f.write('<span class="implicit">')
self.f.write(self._escape(data))
if implicit:
- self.f.write("</span>")
+ self.f.write('</span>')
self.last_stream = stream
def flush(self):
- '''Flush the log stream, to ensure correct log interleaving.
+ """Flush the log stream, to ensure correct log interleaving.
Args:
None.
Returns:
Nothing.
- '''
+ """
self.f.flush()
diff --git a/test/py/test.py b/test/py/test.py
index 9c23898..95671d4 100755
--- a/test/py/test.py
+++ b/test/py/test.py
@@ -16,17 +16,17 @@ import sys
sys.argv.pop(0)
# argv; py.test test_directory_name user-supplied-arguments
-args = ["py.test", os.path.dirname(__file__) + "/tests"]
+args = ['py.test', os.path.dirname(__file__) + '/tests']
args.extend(sys.argv)
try:
- os.execvp("py.test", args)
+ os.execvp('py.test', args)
except:
# Log full details of any exception for detailed analysis
import traceback
traceback.print_exc()
# Hint to the user that they likely simply haven't installed the required
# dependencies.
- print >>sys.stderr, """
+ print >>sys.stderr, '''
exec(py.test) failed; perhaps you are missing some dependencies?
-See test/py/README.md for the list."""
+See test/py/README.md for the list.'''
diff --git a/test/py/tests/test_000_version.py b/test/py/tests/test_000_version.py
index d262f05..43a02e7 100644
--- a/test/py/tests/test_000_version.py
+++ b/test/py/tests/test_000_version.py
@@ -9,7 +9,7 @@
# command prompt.
def test_version(u_boot_console):
- '''Test that the "version" command prints the U-Boot version.'''
+ """Test that the "version" command prints the U-Boot version."""
# "version" prints the U-Boot sign-on message. This is usually considered
# an error, so that any unexpected reboot causes an error. Here, this
diff --git a/test/py/tests/test_dfu.py b/test/py/tests/test_dfu.py
new file mode 100644
index 0000000..093e8d0
--- /dev/null
+++ b/test/py/tests/test_dfu.py
@@ -0,0 +1,279 @@
+# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved.
+#
+# SPDX-License-Identifier: GPL-2.0
+
+# Test U-Boot's "dfu" command. The test starts DFU in U-Boot, waits for USB
+# device enumeration on the host, executes dfu-util multiple times to test
+# various transfer sizes, many of which trigger USB driver edge cases, and
+# finally aborts the "dfu" command in U-Boot.
+
+import os
+import os.path
+import pytest
+import u_boot_utils
+
+"""
+Note: This test relies on:
+
+a) boardenv_* to contain configuration values to define which USB ports are
+available for testing. Without this, this test will be automatically skipped.
+For example:
+
+env__usb_dev_ports = (
+ {
+ "fixture_id": "micro_b",
+ "tgt_usb_ctlr": "0",
+ "host_usb_dev_node": "/dev/usbdev-p2371-2180",
+ # This parameter is optional /if/ you only have a single board
+ # attached to your host at a time.
+ "host_usb_port_path": "3-13",
+ },
+)
+
+env__dfu_configs = (
+ # eMMC, partition 1
+ {
+ "fixture_id": "emmc",
+ "alt_info": "/dfu_test.bin ext4 0 1;/dfu_dummy.bin ext4 0 1",
+ "cmd_params": "mmc 0",
+ # This value is optional.
+ # If present, it specified the set of transfer sizes tested.
+ # If missing, a default list of sizes will be used, which covers
+ # various useful corner cases.
+ # Manually specifying test sizes is useful if you wish to test 4 DFU
+ # configurations, but don't want to test every single transfer size
+ # on each, to avoid bloating the overall time taken by testing.
+ "test_sizes": (63, 64, 65),
+ },
+)
+
+b) udev rules to set permissions on devices nodes, so that sudo is not
+required. For example:
+
+ACTION=="add", SUBSYSTEM=="block", SUBSYSTEMS=="usb", KERNELS=="3-13", MODE:="666"
+
+(You may wish to change the group ID instead of setting the permissions wide
+open. All that matters is that the user ID running the test can access the
+device.)
+"""
+
+# The set of file sizes to test. These values trigger various edge-cases such
+# as one less than, equal to, and one greater than typical USB max packet
+# sizes, and similar boundary conditions.
+test_sizes_default = (
+ 64 - 1,
+ 64,
+ 64 + 1,
+ 128 - 1,
+ 128,
+ 128 + 1,
+ 960 - 1,
+ 960,
+ 960 + 1,
+ 4096 - 1,
+ 4096,
+ 4096 + 1,
+ 1024 * 1024 - 1,
+ 1024 * 1024,
+ 8 * 1024 * 1024,
+)
+
+first_usb_dev_port = None
+
+@pytest.mark.buildconfigspec('cmd_dfu')
+def test_dfu(u_boot_console, env__usb_dev_port, env__dfu_config):
+ """Test the "dfu" command; the host system must be able to enumerate a USB
+ device when "dfu" is running, various DFU transfers are tested, and the
+ USB device must disappear when "dfu" is aborted.
+
+ Args:
+ u_boot_console: A U-Boot console connection.
+ env__usb_dev_port: The single USB device-mode port specification on
+ which to run the test. See the file-level comment above for
+ details of the format.
+ env__dfu_config: The single DFU (memory region) configuration on which
+ to run the test. See the file-level comment above for details
+ of the format.
+
+ Returns:
+ Nothing.
+ """
+
+ def start_dfu():
+ """Start U-Boot's dfu shell command.
+
+ This also waits for the host-side USB enumeration process to complete.
+
+ Args:
+ None.
+
+ Returns:
+ Nothing.
+ """
+
+ fh = u_boot_utils.attempt_to_open_file(
+ env__usb_dev_port['host_usb_dev_node'])
+ if fh:
+ fh.close()
+ raise Exception('USB device present before dfu command invoked')
+
+ u_boot_console.log.action(
+ 'Starting long-running U-Boot dfu shell command')
+
+ cmd = 'setenv dfu_alt_info "%s"' % env__dfu_config['alt_info']
+ u_boot_console.run_command(cmd)
+
+ cmd = 'dfu 0 ' + env__dfu_config['cmd_params']
+ u_boot_console.run_command(cmd, wait_for_prompt=False)
+ u_boot_console.log.action('Waiting for DFU USB device to appear')
+ fh = u_boot_utils.wait_until_open_succeeds(
+ env__usb_dev_port['host_usb_dev_node'])
+ fh.close()
+
+ def stop_dfu(ignore_errors):
+ """Stop U-Boot's dfu shell command from executing.
+
+ This also waits for the host-side USB de-enumeration process to
+ complete.
+
+ Args:
+ ignore_errors: Ignore any errors. This is useful if an error has
+ already been detected, and the code is performing best-effort
+ cleanup. In this case, we do not want to mask the original
+ error by "honoring" any new errors.
+
+ Returns:
+ Nothing.
+ """
+
+ try:
+ u_boot_console.log.action(
+ 'Stopping long-running U-Boot dfu shell command')
+ u_boot_console.ctrlc()
+ u_boot_console.log.action(
+ 'Waiting for DFU USB device to disappear')
+ u_boot_utils.wait_until_file_open_fails(
+ env__usb_dev_port['host_usb_dev_node'], ignore_errors)
+ except:
+ if not ignore_errors:
+ raise
+
+ def run_dfu_util(alt_setting, fn, up_dn_load_arg):
+ """Invoke dfu-util on the host.
+
+ Args:
+ alt_setting: The DFU "alternate setting" identifier to interact
+ with.
+ fn: The host-side file name to transfer.
+ up_dn_load_arg: '-U' or '-D' depending on whether a DFU upload or
+ download operation should be performed.
+
+ Returns:
+ Nothing.
+ """
+
+ cmd = ['dfu-util', '-a', str(alt_setting), up_dn_load_arg, fn]
+ if 'host_usb_port_path' in env__usb_dev_port:
+ cmd += ['-p', env__usb_dev_port['host_usb_port_path']]
+ u_boot_utils.run_and_log(u_boot_console, cmd)
+ u_boot_console.wait_for('Ctrl+C to exit ...')
+
+ def dfu_write(alt_setting, fn):
+ """Write a file to the target board using DFU.
+
+ Args:
+ alt_setting: The DFU "alternate setting" identifier to interact
+ with.
+ fn: The host-side file name to transfer.
+
+ Returns:
+ Nothing.
+ """
+
+ run_dfu_util(alt_setting, fn, '-D')
+
+ def dfu_read(alt_setting, fn):
+ """Read a file from the target board using DFU.
+
+ Args:
+ alt_setting: The DFU "alternate setting" identifier to interact
+ with.
+ fn: The host-side file name to transfer.
+
+ Returns:
+ Nothing.
+ """
+
+ # dfu-util fails reads/uploads if the host file already exists
+ if os.path.exists(fn):
+ os.remove(fn)
+ run_dfu_util(alt_setting, fn, '-U')
+
+ def dfu_write_read_check(size):
+ """Test DFU transfers of a specific size of data
+
+ This function first writes data to the board then reads it back and
+ compares the written and read back data. Measures are taken to avoid
+ certain types of false positives.
+
+ Args:
+ size: The data size to test.
+
+ Returns:
+ Nothing.
+ """
+
+ test_f = u_boot_utils.PersistentRandomFile(u_boot_console,
+ 'dfu_%d.bin' % size, size)
+ readback_fn = u_boot_console.config.result_dir + '/dfu_readback.bin'
+
+ u_boot_console.log.action('Writing test data to DFU primary ' +
+ 'altsetting')
+ dfu_write(0, test_f.abs_fn)
+
+ u_boot_console.log.action('Writing dummy data to DFU secondary ' +
+ 'altsetting to clear DFU buffers')
+ dfu_write(1, dummy_f.abs_fn)
+
+ u_boot_console.log.action('Reading DFU primary altsetting for ' +
+ 'comparison')
+ dfu_read(0, readback_fn)
+
+ u_boot_console.log.action('Comparing written and read data')
+ written_hash = test_f.content_hash
+ read_back_hash = u_boot_utils.md5sum_file(readback_fn, size)
+ assert(written_hash == read_back_hash)
+
+ # This test may be executed against multiple USB ports. The test takes a
+ # long time, so we don't want to do the whole thing each time. Instead,
+ # execute the full test on the first USB port, and perform a very limited
+ # test on other ports. In the limited case, we solely validate that the
+ # host PC can enumerate the U-Boot USB device.
+ global first_usb_dev_port
+ if not first_usb_dev_port:
+ first_usb_dev_port = env__usb_dev_port
+ if env__usb_dev_port == first_usb_dev_port:
+ sizes = env__dfu_config.get('test_sizes', test_sizes_default)
+ else:
+ sizes = []
+
+ dummy_f = u_boot_utils.PersistentRandomFile(u_boot_console,
+ 'dfu_dummy.bin', 1024)
+
+ ignore_cleanup_errors = True
+ try:
+ start_dfu()
+
+ u_boot_console.log.action(
+ 'Overwriting DFU primary altsetting with dummy data')
+ dfu_write(0, dummy_f.abs_fn)
+
+ for size in sizes:
+ with u_boot_console.log.section('Data size %d' % size):
+ dfu_write_read_check(size)
+ # Make the status of each sub-test obvious. If the test didn't
+ # pass, an exception was thrown so this code isn't executed.
+ u_boot_console.log.status_pass('OK')
+ ignore_cleanup_errors = False
+ finally:
+ stop_dfu(ignore_cleanup_errors)
diff --git a/test/py/tests/test_env.py b/test/py/tests/test_env.py
index a3e8dd3..c41aa5a 100644
--- a/test/py/tests/test_env.py
+++ b/test/py/tests/test_env.py
@@ -10,34 +10,34 @@ import pytest
# FIXME: This might be useful for other tests;
# perhaps refactor it into ConsoleBase or some other state object?
class StateTestEnv(object):
- '''Container that represents the state of all U-Boot environment variables.
+ """Container that represents the state of all U-Boot environment variables.
This enables quick determination of existant/non-existant variable
names.
- '''
+ """
def __init__(self, u_boot_console):
- '''Initialize a new StateTestEnv object.
+ """Initialize a new StateTestEnv object.
Args:
u_boot_console: A U-Boot console.
Returns:
Nothing.
- '''
+ """
self.u_boot_console = u_boot_console
self.get_env()
self.set_var = self.get_non_existent_var()
def get_env(self):
- '''Read all current environment variables from U-Boot.
+ """Read all current environment variables from U-Boot.
Args:
None.
Returns:
Nothing.
- '''
+ """
response = self.u_boot_console.run_command('printenv')
self.env = {}
@@ -48,27 +48,27 @@ class StateTestEnv(object):
self.env[var] = value
def get_existent_var(self):
- '''Return the name of an environment variable that exists.
+ """Return the name of an environment variable that exists.
Args:
None.
Returns:
The name of an environment variable.
- '''
+ """
for var in self.env:
return var
def get_non_existent_var(self):
- '''Return the name of an environment variable that does not exist.
+ """Return the name of an environment variable that does not exist.
Args:
None.
Returns:
The name of an environment variable.
- '''
+ """
n = 0
while True:
@@ -77,63 +77,67 @@ class StateTestEnv(object):
return var
n += 1
-@pytest.fixture(scope='module')
+ste = None
+@pytest.fixture(scope='function')
def state_test_env(u_boot_console):
- '''pytest fixture to provide a StateTestEnv object to tests.'''
+ """pytest fixture to provide a StateTestEnv object to tests."""
- return StateTestEnv(u_boot_console)
+ global ste
+ if not ste:
+ ste = StateTestEnv(u_boot_console)
+ return ste
def unset_var(state_test_env, var):
- '''Unset an environment variable.
+ """Unset an environment variable.
This both executes a U-Boot shell command and updates a StateTestEnv
object.
Args:
- state_test_env: The StateTestEnv object to updata.
+ state_test_env: The StateTestEnv object to update.
var: The variable name to unset.
Returns:
Nothing.
- '''
+ """
state_test_env.u_boot_console.run_command('setenv %s' % var)
if var in state_test_env.env:
del state_test_env.env[var]
def set_var(state_test_env, var, value):
- '''Set an environment variable.
+ """Set an environment variable.
This both executes a U-Boot shell command and updates a StateTestEnv
object.
Args:
- state_test_env: The StateTestEnv object to updata.
+ state_test_env: The StateTestEnv object to update.
var: The variable name to set.
value: The value to set the variable to.
Returns:
Nothing.
- '''
+ """
state_test_env.u_boot_console.run_command('setenv %s "%s"' % (var, value))
state_test_env.env[var] = value
def validate_empty(state_test_env, var):
- '''Validate that a variable is not set, using U-Boot shell commands.
+ """Validate that a variable is not set, using U-Boot shell commands.
Args:
var: The variable name to test.
Returns:
Nothing.
- '''
+ """
response = state_test_env.u_boot_console.run_command('echo $%s' % var)
assert response == ''
def validate_set(state_test_env, var, value):
- '''Validate that a variable is set, using U-Boot shell commands.
+ """Validate that a variable is set, using U-Boot shell commands.
Args:
var: The variable name to test.
@@ -141,7 +145,7 @@ def validate_set(state_test_env, var, value):
Returns:
Nothing.
- '''
+ """
# echo does not preserve leading, internal, or trailing whitespace in the
# value. printenv does, and hence allows more complete testing.
@@ -149,20 +153,20 @@ def validate_set(state_test_env, var, value):
assert response == ('%s=%s' % (var, value))
def test_env_echo_exists(state_test_env):
- '''Test echoing a variable that exists.'''
+ """Test echoing a variable that exists."""
var = state_test_env.get_existent_var()
value = state_test_env.env[var]
validate_set(state_test_env, var, value)
def test_env_echo_non_existent(state_test_env):
- '''Test echoing a variable that doesn't exist.'''
+ """Test echoing a variable that doesn't exist."""
var = state_test_env.set_var
validate_empty(state_test_env, var)
def test_env_printenv_non_existent(state_test_env):
- '''Test printenv error message for non-existant variables.'''
+ """Test printenv error message for non-existant variables."""
var = state_test_env.set_var
c = state_test_env.u_boot_console
@@ -171,14 +175,14 @@ def test_env_printenv_non_existent(state_test_env):
assert(response == '## Error: "%s" not defined' % var)
def test_env_unset_non_existent(state_test_env):
- '''Test unsetting a nonexistent variable.'''
+ """Test unsetting a nonexistent variable."""
var = state_test_env.get_non_existent_var()
unset_var(state_test_env, var)
validate_empty(state_test_env, var)
def test_env_set_non_existent(state_test_env):
- '''Test set a non-existant variable.'''
+ """Test set a non-existant variable."""
var = state_test_env.set_var
value = 'foo'
@@ -186,7 +190,7 @@ def test_env_set_non_existent(state_test_env):
validate_set(state_test_env, var, value)
def test_env_set_existing(state_test_env):
- '''Test setting an existant variable.'''
+ """Test setting an existant variable."""
var = state_test_env.set_var
value = 'bar'
@@ -194,14 +198,14 @@ def test_env_set_existing(state_test_env):
validate_set(state_test_env, var, value)
def test_env_unset_existing(state_test_env):
- '''Test unsetting a variable.'''
+ """Test unsetting a variable."""
var = state_test_env.set_var
unset_var(state_test_env, var)
validate_empty(state_test_env, var)
def test_env_expansion_spaces(state_test_env):
- '''Test expanding a variable that contains a space in its value.'''
+ """Test expanding a variable that contains a space in its value."""
var_space = None
var_test = None
diff --git a/test/py/tests/test_help.py b/test/py/tests/test_help.py
index 894f3b5..420090c 100644
--- a/test/py/tests/test_help.py
+++ b/test/py/tests/test_help.py
@@ -4,6 +4,6 @@
# SPDX-License-Identifier: GPL-2.0
def test_help(u_boot_console):
- '''Test that the "help" command can be executed.'''
+ """Test that the "help" command can be executed."""
u_boot_console.run_command('help')
diff --git a/test/py/tests/test_hush_if_test.py b/test/py/tests/test_hush_if_test.py
index cf4c3ae..8b88425 100644
--- a/test/py/tests/test_hush_if_test.py
+++ b/test/py/tests/test_hush_if_test.py
@@ -95,7 +95,7 @@ subtests = (
)
def exec_hush_if(u_boot_console, expr, result):
- '''Execute a shell "if" command, and validate its result.'''
+ """Execute a shell "if" command, and validate its result."""
cmd = 'if ' + expr + '; then echo true; else echo false; fi'
response = u_boot_console.run_command(cmd)
@@ -103,7 +103,7 @@ def exec_hush_if(u_boot_console, expr, result):
@pytest.mark.buildconfigspec('sys_hush_parser')
def test_hush_if_test_setup(u_boot_console):
- '''Set up environment variables used during the "if" tests.'''
+ """Set up environment variables used during the "if" tests."""
u_boot_console.run_command('setenv ut_var_nonexistent')
u_boot_console.run_command('setenv ut_var_exists 1')
@@ -111,13 +111,13 @@ def test_hush_if_test_setup(u_boot_console):
@pytest.mark.buildconfigspec('sys_hush_parser')
@pytest.mark.parametrize('expr,result', subtests)
def test_hush_if_test(u_boot_console, expr, result):
- '''Test a single "if test" condition.'''
+ """Test a single "if test" condition."""
exec_hush_if(u_boot_console, expr, result)
@pytest.mark.buildconfigspec('sys_hush_parser')
def test_hush_if_test_teardown(u_boot_console):
- '''Clean up environment variables used during the "if" tests.'''
+ """Clean up environment variables used during the "if" tests."""
u_boot_console.run_command('setenv ut_var_exists')
@@ -126,7 +126,7 @@ def test_hush_if_test_teardown(u_boot_console):
# Of those, only UMS currently allows file removal though.
@pytest.mark.boardspec('sandbox')
def test_hush_if_test_host_file_exists(u_boot_console):
- '''Test the "if test -e" shell command.'''
+ """Test the "if test -e" shell command."""
test_file = u_boot_console.config.result_dir + \
'/creating_this_file_breaks_u_boot_tests'
diff --git a/test/py/tests/test_md.py b/test/py/tests/test_md.py
index 94603c7..5fe2582 100644
--- a/test/py/tests/test_md.py
+++ b/test/py/tests/test_md.py
@@ -4,13 +4,14 @@
# SPDX-License-Identifier: GPL-2.0
import pytest
+import u_boot_utils
@pytest.mark.buildconfigspec('cmd_memory')
def test_md(u_boot_console):
- '''Test that md reads memory as expected, and that memory can be modified
- using the mw command.'''
+ """Test that md reads memory as expected, and that memory can be modified
+ using the mw command."""
- ram_base = u_boot_console.find_ram_base()
+ ram_base = u_boot_utils.find_ram_base(u_boot_console)
addr = '%08x' % ram_base
val = 'a5f09876'
expected_response = addr + ': ' + val
@@ -23,10 +24,10 @@ def test_md(u_boot_console):
@pytest.mark.buildconfigspec('cmd_memory')
def test_md_repeat(u_boot_console):
- '''Test command repeat (via executing an empty command) operates correctly
- for "md"; the command must repeat and dump an incrementing address.'''
+ """Test command repeat (via executing an empty command) operates correctly
+ for "md"; the command must repeat and dump an incrementing address."""
- ram_base = u_boot_console.find_ram_base()
+ ram_base = u_boot_utils.find_ram_base(u_boot_console)
addr_base = '%08x' % ram_base
words = 0x10
addr_repeat = '%08x' % (ram_base + (words * 4))
diff --git a/test/py/tests/test_net.py b/test/py/tests/test_net.py
new file mode 100644
index 0000000..07393eb
--- /dev/null
+++ b/test/py/tests/test_net.py
@@ -0,0 +1,155 @@
+# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved.
+#
+# SPDX-License-Identifier: GPL-2.0
+
+# Test various network-related functionality, such as the dhcp, ping, and
+# tftpboot commands.
+
+import pytest
+import u_boot_utils
+
+"""
+Note: This test relies on boardenv_* containing configuration values to define
+which the network environment available for testing. Without this, this test
+will be automatically skipped.
+
+For example:
+
+# Boolean indicating whether the Ethernet device is attached to USB, and hence
+# USB enumeration needs to be performed prior to network tests.
+# This variable may be omitted if its value is False.
+env__net_uses_usb = False
+
+# Boolean indicating whether the Ethernet device is attached to PCI, and hence
+# PCI enumeration needs to be performed prior to network tests.
+# This variable may be omitted if its value is False.
+env__net_uses_pci = True
+
+# True if a DHCP server is attached to the network, and should be tested.
+# If DHCP testing is not possible or desired, this variable may be omitted or
+# set to False.
+env__net_dhcp_server = True
+
+# A list of environment variables that should be set in order to configure a
+# static IP. If solely relying on DHCP, this variable may be omitted or set to
+# an empty list.
+env__net_static_env_vars = [
+ ("ipaddr", "10.0.0.100"),
+ ("netmask", "255.255.255.0"),
+ ("serverip", "10.0.0.1"),
+]
+
+# Details regarding a file that may be read from a TFTP server. This variable
+# may be omitted or set to None if TFTP testing is not possible or desired.
+env__net_tftp_readable_file = {
+ "fn": "ubtest-readable.bin",
+ "size": 5058624,
+ "crc32": "c2244b26",
+}
+"""
+
+net_set_up = False
+
+def test_net_pre_commands(u_boot_console):
+ """Execute any commands required to enable network hardware.
+
+ These commands are provided by the boardenv_* file; see the comment at the
+ beginning of this file.
+ """
+
+ init_usb = u_boot_console.config.env.get('env__net_uses_usb', False)
+ if init_usb:
+ u_boot_console.run_command('usb start')
+
+ init_pci = u_boot_console.config.env.get('env__net_uses_pci', False)
+ if init_pci:
+ u_boot_console.run_command('pci enum')
+
+@pytest.mark.buildconfigspec('cmd_dhcp')
+def test_net_dhcp(u_boot_console):
+ """Test the dhcp command.
+
+ The boardenv_* file may be used to enable/disable this test; see the
+ comment at the beginning of this file.
+ """
+
+ test_dhcp = u_boot_console.config.env.get('env__net_dhcp_server', False)
+ if not test_dhcp:
+ pytest.skip('No DHCP server available')
+
+ u_boot_console.run_command('setenv autoload no')
+ output = u_boot_console.run_command('dhcp')
+ assert 'DHCP client bound to address ' in output
+
+ global net_set_up
+ net_set_up = True
+
+@pytest.mark.buildconfigspec('net')
+def test_net_setup_static(u_boot_console):
+ """Set up a static IP configuration.
+
+ The configuration is provided by the boardenv_* file; see the comment at
+ the beginning of this file.
+ """
+
+ env_vars = u_boot_console.config.env.get('env__net_static_env_vars', None)
+ if not env_vars:
+ pytest.skip('No static network configuration is defined')
+
+ for (var, val) in env_vars:
+ u_boot_console.run_command('setenv %s %s' % (var, val))
+
+ global net_set_up
+ net_set_up = True
+
+@pytest.mark.buildconfigspec('cmd_ping')
+def test_net_ping(u_boot_console):
+ """Test the ping command.
+
+ The $serverip (as set up by either test_net_dhcp or test_net_setup_static)
+ is pinged. The test validates that the host is alive, as reported by the
+ ping command's output.
+ """
+
+ if not net_set_up:
+ pytest.skip('Network not initialized')
+
+ output = u_boot_console.run_command('ping $serverip')
+ assert 'is alive' in output
+
+@pytest.mark.buildconfigspec('cmd_net')
+def test_net_tftpboot(u_boot_console):
+ """Test the tftpboot command.
+
+ A file is downloaded from the TFTP server, its size and optionally its
+ CRC32 are validated.
+
+ The details of the file to download are provided by the boardenv_* file;
+ see the comment at the beginning of this file.
+ """
+
+ if not net_set_up:
+ pytest.skip('Network not initialized')
+
+ f = u_boot_console.config.env.get('env__net_tftp_readable_file', None)
+ if not f:
+ pytest.skip('No TFTP readable file to read')
+
+ addr = u_boot_utils.find_ram_base(u_boot_console)
+ fn = f['fn']
+ output = u_boot_console.run_command('tftpboot %x %s' % (addr, fn))
+ expected_text = 'Bytes transferred = '
+ sz = f.get('size', None)
+ if sz:
+ expected_text += '%d' % sz
+ assert expected_text in output
+
+ expected_crc = f.get('crc32', None)
+ if not expected_crc:
+ return
+
+ if u_boot_console.config.buildconfig.get('config_cmd_crc32', 'n') != 'y':
+ return
+
+ output = u_boot_console.run_command('crc32 %x $filesize' % addr)
+ assert expected_crc in output
diff --git a/test/py/tests/test_sandbox_exit.py b/test/py/tests/test_sandbox_exit.py
index 2aa8eb4..d1aa308 100644
--- a/test/py/tests/test_sandbox_exit.py
+++ b/test/py/tests/test_sandbox_exit.py
@@ -9,16 +9,14 @@ import signal
@pytest.mark.boardspec('sandbox')
@pytest.mark.buildconfigspec('reset')
def test_reset(u_boot_console):
- '''Test that the "reset" command exits sandbox process.'''
+ """Test that the "reset" command exits sandbox process."""
u_boot_console.run_command('reset', wait_for_prompt=False)
assert(u_boot_console.validate_exited())
- u_boot_console.ensure_spawned()
@pytest.mark.boardspec('sandbox')
def test_ctrl_c(u_boot_console):
- '''Test that sending SIGINT to sandbox causes it to exit.'''
+ """Test that sending SIGINT to sandbox causes it to exit."""
u_boot_console.kill(signal.SIGINT)
assert(u_boot_console.validate_exited())
- u_boot_console.ensure_spawned()
diff --git a/test/py/tests/test_shell_basics.py b/test/py/tests/test_shell_basics.py
index 719ce61..702e5e2 100644
--- a/test/py/tests/test_shell_basics.py
+++ b/test/py/tests/test_shell_basics.py
@@ -5,13 +5,13 @@
# Test basic shell functionality, such as commands separate by semi-colons.
def test_shell_execute(u_boot_console):
- '''Test any shell command.'''
+ """Test any shell command."""
response = u_boot_console.run_command('echo hello')
assert response.strip() == 'hello'
def test_shell_semicolon_two(u_boot_console):
- '''Test two shell commands separate by a semi-colon.'''
+ """Test two shell commands separate by a semi-colon."""
cmd = 'echo hello; echo world'
response = u_boot_console.run_command(cmd)
@@ -19,8 +19,8 @@ def test_shell_semicolon_two(u_boot_console):
assert response.index('hello') < response.index('world')
def test_shell_semicolon_three(u_boot_console):
- '''Test three shell commands separate by a semi-colon, with variable
- expansion dependencies between them.'''
+ """Test three shell commands separate by a semi-colon, with variable
+ expansion dependencies between them."""
cmd = 'setenv list 1; setenv list ${list}2; setenv list ${list}3; ' + \
'echo ${list}'
@@ -29,9 +29,9 @@ def test_shell_semicolon_three(u_boot_console):
u_boot_console.run_command('setenv list')
def test_shell_run(u_boot_console):
- '''Test the "run" shell command.'''
+ """Test the "run" shell command."""
- u_boot_console.run_command('setenv foo \"setenv monty 1; setenv python 2\"')
+ u_boot_console.run_command('setenv foo "setenv monty 1; setenv python 2"')
u_boot_console.run_command('run foo')
response = u_boot_console.run_command('echo $monty')
assert response.strip() == '1'
diff --git a/test/py/tests/test_sleep.py b/test/py/tests/test_sleep.py
index 64f1ddf..74add89 100644
--- a/test/py/tests/test_sleep.py
+++ b/test/py/tests/test_sleep.py
@@ -6,12 +6,8 @@ import pytest
import time
def test_sleep(u_boot_console):
- '''Test the sleep command, and validate that it sleeps for approximately
- the correct amount of time.'''
-
- # Do this before we time anything, to make sure U-Boot is already running.
- # Otherwise, the system boot time is included in the time measurement.
- u_boot_console.ensure_spawned()
+ """Test the sleep command, and validate that it sleeps for approximately
+ the correct amount of time."""
# 3s isn't too long, but is enough to cross a few second boundaries.
sleep_time = 3
diff --git a/test/py/tests/test_ums.py b/test/py/tests/test_ums.py
index a137221..8c3ee2b 100644
--- a/test/py/tests/test_ums.py
+++ b/test/py/tests/test_ums.py
@@ -2,28 +2,58 @@
#
# SPDX-License-Identifier: GPL-2.0
-# Test U-Boot's "ums" command. At present, this test only ensures that a UMS
-# device can be enumerated by the host/test machine. In the future, this test
-# should be enhanced to validate disk IO.
+# Test U-Boot's "ums" command. The test starts UMS in U-Boot, waits for USB
+# device enumeration on the host, reads a small block of data from the UMS
+# block device, optionally mounts a partition and performs filesystem-based
+# read/write tests, and finally aborts the "ums" command in U-Boot.
import os
+import os.path
import pytest
+import re
import time
+import u_boot_utils
-'''
+"""
Note: This test relies on:
a) boardenv_* to contain configuration values to define which USB ports are
available for testing. Without this, this test will be automatically skipped.
For example:
+# Leave this list empty if you have no block_devs below with writable
+# partitions defined.
+env__mount_points = (
+ "/mnt/ubtest-mnt-p2371-2180-na",
+)
+
env__usb_dev_ports = (
- {'tgt_usb_ctlr': '0', 'host_ums_dev_node': '/dev/disk/by-path/pci-0000:00:14.0-usb-0:13:1.0-scsi-0:0:0:0'},
+ {
+ "fixture_id": "micro_b",
+ "tgt_usb_ctlr": "0",
+ "host_ums_dev_node": "/dev/disk/by-path/pci-0000:00:14.0-usb-0:13:1.0-scsi-0:0:0:0",
+ },
)
env__block_devs = (
- {'type': 'mmc', 'id': '0'}, # eMMC; always present
- {'type': 'mmc', 'id': '1'}, # SD card; present since I plugged one in
+ # eMMC; always present
+ {
+ "fixture_id": "emmc",
+ "type": "mmc",
+ "id": "0",
+ # The following two properties are optional.
+ # If present, the partition will be mounted and a file written-to and
+ # read-from it. If missing, only a simple block read test will be
+ # performed.
+ "writable_fs_partition": 1,
+ "writable_fs_subdir": "tmp/",
+ },
+ # SD card; present since I plugged one in
+ {
+ "fixture_id": "sd",
+ "type": "mmc",
+ "id": "1"
+ },
)
b) udev rules to set permissions on devices nodes, so that sudo is not
@@ -34,47 +64,42 @@ ACTION=="add", SUBSYSTEM=="block", SUBSYSTEMS=="usb", KERNELS=="3-13", MODE:="66
(You may wish to change the group ID instead of setting the permissions wide
open. All that matters is that the user ID running the test can access the
device.)
-'''
-def open_ums_device(host_ums_dev_node):
- '''Attempt to open a device node, returning either the opened file handle,
- or None on any error.'''
+c) /etc/fstab entries to allow the block device to be mounted without requiring
+root permissions. For example:
- try:
- return open(host_ums_dev_node, 'rb')
- except:
- return None
-
-def wait_for_ums_device(host_ums_dev_node):
- '''Continually attempt to open the device node exported by the "ums"
- command, and either return the opened file handle, or raise an exception
- after a timeout.'''
-
- for i in xrange(100):
- fh = open_ums_device(host_ums_dev_node)
- if fh:
- return fh
- time.sleep(0.1)
- raise Exception('UMS device did not appear')
-
-def wait_for_ums_device_gone(host_ums_dev_node):
- '''Continually attempt to open the device node exported by the "ums"
- command, and either return once the device has disappeared, or raise an
- exception if it does not before a timeout occurs.'''
-
- for i in xrange(100):
- fh = open_ums_device(host_ums_dev_node)
- if not fh:
- return
- fh.close()
- time.sleep(0.1)
- raise Exception('UMS device did not disappear')
+/dev/disk/by-path/pci-0000:00:14.0-usb-0:13:1.0-scsi-0:0:0:0-part1 /mnt/ubtest-mnt-p2371-2180-na ext4 noauto,user,nosuid,nodev
+
+This entry is only needed if any block_devs above contain a
+writable_fs_partition value.
+"""
@pytest.mark.buildconfigspec('cmd_usb_mass_storage')
def test_ums(u_boot_console, env__usb_dev_port, env__block_devs):
- '''Test the "ums" command; the host system must be able to enumerate a UMS
- device when "ums" is running, and this device must disappear when "ums" is
- aborted.'''
+ """Test the "ums" command; the host system must be able to enumerate a UMS
+ device when "ums" is running, block and optionally file I/O are tested,
+ and this device must disappear when "ums" is aborted.
+
+ Args:
+ u_boot_console: A U-Boot console connection.
+ env__usb_dev_port: The single USB device-mode port specification on
+ which to run the test. See the file-level comment above for
+ details of the format.
+ env__block_devs: The list of block devices that the target U-Boot
+ device has attached. See the file-level comment above for details
+ of the format.
+
+ Returns:
+ Nothing.
+ """
+
+ have_writable_fs_partition = 'writable_fs_partition' in env__block_devs[0]
+ if not have_writable_fs_partition:
+ # If 'writable_fs_subdir' is missing, we'll skip all parts of the
+ # testing which mount filesystems.
+ u_boot_console.log.warning(
+ 'boardenv missing "writable_fs_partition"; ' +
+ 'UMS testing will be limited.')
tgt_usb_ctlr = env__usb_dev_port['tgt_usb_ctlr']
host_ums_dev_node = env__usb_dev_port['host_ums_dev_node']
@@ -84,11 +109,129 @@ def test_ums(u_boot_console, env__usb_dev_port, env__block_devs):
# device list here. We'll test each block device somewhere else.
tgt_dev_type = env__block_devs[0]['type']
tgt_dev_id = env__block_devs[0]['id']
+ if have_writable_fs_partition:
+ mount_point = u_boot_console.config.env['env__mount_points'][0]
+ mount_subdir = env__block_devs[0]['writable_fs_subdir']
+ part_num = env__block_devs[0]['writable_fs_partition']
+ host_ums_part_node = '%s-part%d' % (host_ums_dev_node, part_num)
+ else:
+ host_ums_part_node = host_ums_dev_node
+
+ test_f = u_boot_utils.PersistentRandomFile(u_boot_console, 'ums.bin',
+ 1024 * 1024);
+ if have_writable_fs_partition:
+ mounted_test_fn = mount_point + '/' + mount_subdir + test_f.fn
+
+ def start_ums():
+ """Start U-Boot's ums shell command.
+
+ This also waits for the host-side USB enumeration process to complete.
+
+ Args:
+ None.
+
+ Returns:
+ Nothing.
+ """
+
+ u_boot_console.log.action(
+ 'Starting long-running U-Boot ums shell command')
+ cmd = 'ums %s %s %s' % (tgt_usb_ctlr, tgt_dev_type, tgt_dev_id)
+ u_boot_console.run_command(cmd, wait_for_prompt=False)
+ u_boot_console.wait_for(re.compile('UMS: LUN.*[\r\n]'))
+ fh = u_boot_utils.wait_until_open_succeeds(host_ums_part_node)
+ u_boot_console.log.action('Reading raw data from UMS device')
+ fh.read(4096)
+ fh.close()
+
+ def mount():
+ """Mount the block device that U-Boot exports.
+
+ Args:
+ None.
+
+ Returns:
+ Nothing.
+ """
+
+ u_boot_console.log.action('Mounting exported UMS device')
+ cmd = ('/bin/mount', host_ums_part_node)
+ u_boot_utils.run_and_log(u_boot_console, cmd)
+
+ def umount(ignore_errors):
+ """Unmount the block device that U-Boot exports.
- cmd = 'ums %s %s %s' % (tgt_usb_ctlr, tgt_dev_type, tgt_dev_id)
- u_boot_console.run_command('ums 0 mmc 0', wait_for_prompt=False)
- fh = wait_for_ums_device(host_ums_dev_node)
- fh.read(4096)
- fh.close()
- u_boot_console.ctrlc()
- wait_for_ums_device_gone(host_ums_dev_node)
+ Args:
+ ignore_errors: Ignore any errors. This is useful if an error has
+ already been detected, and the code is performing best-effort
+ cleanup. In this case, we do not want to mask the original
+ error by "honoring" any new errors.
+
+ Returns:
+ Nothing.
+ """
+
+ u_boot_console.log.action('Unmounting UMS device')
+ cmd = ('/bin/umount', host_ums_part_node)
+ u_boot_utils.run_and_log(u_boot_console, cmd, ignore_errors)
+
+ def stop_ums(ignore_errors):
+ """Stop U-Boot's ums shell command from executing.
+
+ This also waits for the host-side USB de-enumeration process to
+ complete.
+
+ Args:
+ ignore_errors: Ignore any errors. This is useful if an error has
+ already been detected, and the code is performing best-effort
+ cleanup. In this case, we do not want to mask the original
+ error by "honoring" any new errors.
+
+ Returns:
+ Nothing.
+ """
+
+ u_boot_console.log.action(
+ 'Stopping long-running U-Boot ums shell command')
+ u_boot_console.ctrlc()
+ u_boot_utils.wait_until_file_open_fails(host_ums_part_node,
+ ignore_errors)
+
+ ignore_cleanup_errors = True
+ try:
+ start_ums()
+ if not have_writable_fs_partition:
+ # Skip filesystem-based testing if not configured
+ return
+ try:
+ mount()
+ u_boot_console.log.action('Writing test file via UMS')
+ cmd = ('rm', '-f', mounted_test_fn)
+ u_boot_utils.run_and_log(u_boot_console, cmd)
+ if os.path.exists(mounted_test_fn):
+ raise Exception('Could not rm target UMS test file')
+ cmd = ('cp', test_f.abs_fn, mounted_test_fn)
+ u_boot_utils.run_and_log(u_boot_console, cmd)
+ ignore_cleanup_errors = False
+ finally:
+ umount(ignore_errors=ignore_cleanup_errors)
+ finally:
+ stop_ums(ignore_errors=ignore_cleanup_errors)
+
+ ignore_cleanup_errors = True
+ try:
+ start_ums()
+ try:
+ mount()
+ u_boot_console.log.action('Reading test file back via UMS')
+ read_back_hash = u_boot_utils.md5sum_file(mounted_test_fn)
+ cmd = ('rm', '-f', mounted_test_fn)
+ u_boot_utils.run_and_log(u_boot_console, cmd)
+ ignore_cleanup_errors = False
+ finally:
+ umount(ignore_errors=ignore_cleanup_errors)
+ finally:
+ stop_ums(ignore_errors=ignore_cleanup_errors)
+
+ written_hash = test_f.content_hash
+ assert(written_hash == read_back_hash)
diff --git a/test/py/tests/test_unknown_cmd.py b/test/py/tests/test_unknown_cmd.py
index 2de93e0..c27ab49 100644
--- a/test/py/tests/test_unknown_cmd.py
+++ b/test/py/tests/test_unknown_cmd.py
@@ -4,8 +4,8 @@
# SPDX-License-Identifier: GPL-2.0
def test_unknown_command(u_boot_console):
- '''Test that executing an unknown command causes U-Boot to print an
- error.'''
+ """Test that executing an unknown command causes U-Boot to print an
+ error."""
# The "unknown command" error is actively expected here,
# so error detection for it is disabled.
diff --git a/test/py/u_boot_console_base.py b/test/py/u_boot_console_base.py
index 520f9a9..392f8cb 100644
--- a/test/py/u_boot_console_base.py
+++ b/test/py/u_boot_console_base.py
@@ -14,6 +14,7 @@ import os
import pytest
import re
import sys
+import u_boot_spawn
# Regexes for text we expect U-Boot to send to the console.
pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}-[^\r\n]*)')
@@ -21,14 +22,27 @@ pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}-[^\r\n]*)')
pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
pattern_error_notification = re.compile('## Error: ')
+pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
+
+PAT_ID = 0
+PAT_RE = 1
+
+bad_pattern_defs = (
+ ('spl_signon', pattern_u_boot_spl_signon),
+ ('main_signon', pattern_u_boot_main_signon),
+ ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
+ ('unknown_command', pattern_unknown_command),
+ ('error_notification', pattern_error_notification),
+ ('error_please_reset', pattern_error_please_reset),
+)
class ConsoleDisableCheck(object):
- '''Context manager (for Python's with statement) that temporarily disables
+ """Context manager (for Python's with statement) that temporarily disables
the specified console output error check. This is useful when deliberately
executing a command that is known to trigger one of the error checks, in
order to test that the error condition is actually raised. This class is
used internally by ConsoleBase::disable_check(); it is not intended for
- direct usage.'''
+ direct usage."""
def __init__(self, console, check_type):
self.console = console
@@ -36,18 +50,20 @@ class ConsoleDisableCheck(object):
def __enter__(self):
self.console.disable_check_count[self.check_type] += 1
+ self.console.eval_bad_patterns()
def __exit__(self, extype, value, traceback):
self.console.disable_check_count[self.check_type] -= 1
+ self.console.eval_bad_patterns()
class ConsoleBase(object):
- '''The interface through which test functions interact with the U-Boot
+ """The interface through which test functions interact with the U-Boot
console. This primarily involves executing shell commands, capturing their
results, and checking for common error conditions. Some common utilities
- are also provided too.'''
+ are also provided too."""
def __init__(self, log, config, max_fifo_fill):
- '''Initialize a U-Boot console connection.
+ """Initialize a U-Boot console connection.
Can only usefully be called by sub-classes.
@@ -64,7 +80,7 @@ class ConsoleBase(object):
Returns:
Nothing.
- '''
+ """
self.log = log
self.config = config
@@ -76,19 +92,20 @@ class ConsoleBase(object):
self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
self.prompt_escaped = re.escape(self.prompt)
self.p = None
- self.disable_check_count = {
- 'spl_signon': 0,
- 'main_signon': 0,
- 'unknown_command': 0,
- 'error_notification': 0,
- }
+ self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
+ self.eval_bad_patterns()
self.at_prompt = False
self.at_prompt_logevt = None
- self.ram_base = None
+
+ def eval_bad_patterns(self):
+ self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
+ if self.disable_check_count[pat[PAT_ID]] == 0]
+ self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
+ if self.disable_check_count[pat[PAT_ID]] == 0]
def close(self):
- '''Terminate the connection to the U-Boot console.
+ """Terminate the connection to the U-Boot console.
This function is only useful once all interaction with U-Boot is
complete. Once this function is called, data cannot be sent to or
@@ -99,7 +116,7 @@ class ConsoleBase(object):
Returns:
Nothing.
- '''
+ """
if self.p:
self.p.close()
@@ -107,7 +124,7 @@ class ConsoleBase(object):
def run_command(self, cmd, wait_for_echo=True, send_nl=True,
wait_for_prompt=True):
- '''Execute a command via the U-Boot console.
+ """Execute a command via the U-Boot console.
The command is always sent to U-Boot.
@@ -142,29 +159,12 @@ class ConsoleBase(object):
The output from U-Boot during command execution. In other
words, the text U-Boot emitted between the point it echod the
command string and emitted the subsequent command prompts.
- '''
-
- self.ensure_spawned()
+ """
if self.at_prompt and \
self.at_prompt_logevt != self.logstream.logfile.cur_evt:
self.logstream.write(self.prompt, implicit=True)
- bad_patterns = []
- bad_pattern_ids = []
- if (self.disable_check_count['spl_signon'] == 0 and
- self.u_boot_spl_signon):
- bad_patterns.append(self.u_boot_spl_signon_escaped)
- bad_pattern_ids.append('SPL signon')
- if self.disable_check_count['main_signon'] == 0:
- bad_patterns.append(self.u_boot_main_signon_escaped)
- bad_pattern_ids.append('U-Boot main signon')
- if self.disable_check_count['unknown_command'] == 0:
- bad_patterns.append(pattern_unknown_command)
- bad_pattern_ids.append('Unknown command')
- if self.disable_check_count['error_notification'] == 0:
- bad_patterns.append(pattern_error_notification)
- bad_pattern_ids.append('Error notification')
try:
self.at_prompt = False
if send_nl:
@@ -178,18 +178,18 @@ class ConsoleBase(object):
continue
chunk = re.escape(chunk)
chunk = chunk.replace('\\\n', '[\r\n]')
- m = self.p.expect([chunk] + bad_patterns)
+ m = self.p.expect([chunk] + self.bad_patterns)
if m != 0:
self.at_prompt = False
raise Exception('Bad pattern found on console: ' +
- bad_pattern_ids[m - 1])
+ self.bad_pattern_ids[m - 1])
if not wait_for_prompt:
return
- m = self.p.expect([self.prompt_escaped] + bad_patterns)
+ m = self.p.expect([self.prompt_escaped] + self.bad_patterns)
if m != 0:
self.at_prompt = False
raise Exception('Bad pattern found on console: ' +
- bad_pattern_ids[m - 1])
+ self.bad_pattern_ids[m - 1])
self.at_prompt = True
self.at_prompt_logevt = self.logstream.logfile.cur_evt
# Only strip \r\n; space/TAB might be significant if testing
@@ -201,7 +201,7 @@ class ConsoleBase(object):
raise
def ctrlc(self):
- '''Send a CTRL-C character to U-Boot.
+ """Send a CTRL-C character to U-Boot.
This is useful in order to stop execution of long-running synchronous
commands such as "ums".
@@ -211,12 +211,72 @@ class ConsoleBase(object):
Returns:
Nothing.
- '''
+ """
+ self.log.action('Sending Ctrl-C')
self.run_command(chr(3), wait_for_echo=False, send_nl=False)
+ def wait_for(self, text):
+ """Wait for a pattern to be emitted by U-Boot.
+
+ This is useful when a long-running command such as "dfu" is executing,
+ and it periodically emits some text that should show up at a specific
+ location in the log file.
+
+ Args:
+ text: The text to wait for; either a string (containing raw text,
+ not a regular expression) or an re object.
+
+ Returns:
+ Nothing.
+ """
+
+ if type(text) == type(''):
+ text = re.escape(text)
+ m = self.p.expect([text] + self.bad_patterns)
+ if m != 0:
+ raise Exception('Bad pattern found on console: ' +
+ self.bad_pattern_ids[m - 1])
+
+ def drain_console(self):
+ """Read from and log the U-Boot console for a short time.
+
+ U-Boot's console output is only logged when the test code actively
+ waits for U-Boot to emit specific data. There are cases where tests
+ can fail without doing this. For example, if a test asks U-Boot to
+ enable USB device mode, then polls until a host-side device node
+ exists. In such a case, it is useful to log U-Boot's console output
+ in case U-Boot printed clues as to why the host-side even did not
+ occur. This function will do that.
+
+ Args:
+ None.
+
+ Returns:
+ Nothing.
+ """
+
+ # If we are already not connected to U-Boot, there's nothing to drain.
+ # This should only happen when a previous call to run_command() or
+ # wait_for() failed (and hence the output has already been logged), or
+ # the system is shutting down.
+ if not self.p:
+ return
+
+ orig_timeout = self.p.timeout
+ try:
+ # Drain the log for a relatively short time.
+ self.p.timeout = 1000
+ # Wait for something U-Boot will likely never send. This will
+ # cause the console output to be read and logged.
+ self.p.expect(['This should never match U-Boot output'])
+ except u_boot_spawn.Timeout:
+ pass
+ finally:
+ self.p.timeout = orig_timeout
+
def ensure_spawned(self):
- '''Ensure a connection to a correctly running U-Boot instance.
+ """Ensure a connection to a correctly running U-Boot instance.
This may require spawning a new Sandbox process or resetting target
hardware, as defined by the implementation sub-class.
@@ -228,7 +288,7 @@ class ConsoleBase(object):
Returns:
Nothing.
- '''
+ """
if self.p:
return
@@ -243,26 +303,30 @@ class ConsoleBase(object):
self.p.timeout = 30000
self.p.logfile_read = self.logstream
if self.config.buildconfig.get('CONFIG_SPL', False) == 'y':
- self.p.expect([pattern_u_boot_spl_signon])
- self.u_boot_spl_signon = self.p.after
- self.u_boot_spl_signon_escaped = re.escape(self.p.after)
- else:
- self.u_boot_spl_signon = None
- self.p.expect([pattern_u_boot_main_signon])
- self.u_boot_main_signon = self.p.after
- self.u_boot_main_signon_escaped = re.escape(self.p.after)
- build_idx = self.u_boot_main_signon.find(', Build:')
+ m = self.p.expect([pattern_u_boot_spl_signon] + self.bad_patterns)
+ if m != 0:
+ raise Exception('Bad pattern found on console: ' +
+ self.bad_pattern_ids[m - 1])
+ m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns)
+ if m != 0:
+ raise Exception('Bad pattern found on console: ' +
+ self.bad_pattern_ids[m - 1])
+ signon = self.p.after
+ build_idx = signon.find(', Build:')
if build_idx == -1:
- self.u_boot_version_string = self.u_boot_main_signon
+ self.u_boot_version_string = signon
else:
- self.u_boot_version_string = self.u_boot_main_signon[:build_idx]
+ self.u_boot_version_string = signon[:build_idx]
while True:
- match = self.p.expect([self.prompt_escaped,
- pattern_stop_autoboot_prompt])
- if match == 1:
+ m = self.p.expect([self.prompt_escaped,
+ pattern_stop_autoboot_prompt] + self.bad_patterns)
+ if m == 0:
+ break
+ if m == 1:
self.p.send(chr(3)) # CTRL-C
continue
- break
+ raise Exception('Bad pattern found on console: ' +
+ self.bad_pattern_ids[m - 2])
self.at_prompt = True
self.at_prompt_logevt = self.logstream.logfile.cur_evt
except Exception as ex:
@@ -271,7 +335,7 @@ class ConsoleBase(object):
raise
def cleanup_spawn(self):
- '''Shut down all interaction with the U-Boot instance.
+ """Shut down all interaction with the U-Boot instance.
This is used when an error is detected prior to re-establishing a
connection with a fresh U-Boot instance.
@@ -283,7 +347,7 @@ class ConsoleBase(object):
Returns:
Nothing.
- '''
+ """
try:
if self.p:
@@ -293,7 +357,7 @@ class ConsoleBase(object):
self.p = None
def validate_version_string_in_text(self, text):
- '''Assert that a command's output includes the U-Boot signon message.
+ """Assert that a command's output includes the U-Boot signon message.
This is primarily useful for validating the "version" command without
duplicating the signon text regex in a test function.
@@ -303,12 +367,12 @@ class ConsoleBase(object):
Returns:
Nothing. An exception is raised if the validation fails.
- '''
+ """
assert(self.u_boot_version_string in text)
def disable_check(self, check_type):
- '''Temporarily disable an error check of U-Boot's output.
+ """Temporarily disable an error check of U-Boot's output.
Create a new context manager (for use with the "with" statement) which
temporarily disables a particular console output error check.
@@ -319,42 +383,6 @@ class ConsoleBase(object):
Returns:
A context manager object.
- '''
+ """
return ConsoleDisableCheck(self, check_type)
-
- def find_ram_base(self):
- '''Find the running U-Boot's RAM location.
-
- Probe the running U-Boot to determine the address of the first bank
- of RAM. This is useful for tests that test reading/writing RAM, or
- load/save files that aren't associated with some standard address
- typically represented in an environment variable such as
- ${kernel_addr_r}. The value is cached so that it only needs to be
- actively read once.
-
- Args:
- None.
-
- Returns:
- The address of U-Boot's first RAM bank, as an integer.
- '''
-
- if self.config.buildconfig.get('config_cmd_bdi', 'n') != 'y':
- pytest.skip('bdinfo command not supported')
- if self.ram_base == -1:
- pytest.skip('Previously failed to find RAM bank start')
- if self.ram_base is not None:
- return self.ram_base
-
- with self.log.section('find_ram_base'):
- response = self.run_command('bdinfo')
- for l in response.split('\n'):
- if '-> start' in l:
- self.ram_base = int(l.split('=')[1].strip(), 16)
- break
- if self.ram_base is None:
- self.ram_base = -1
- raise Exception('Failed to find RAM bank start in `bdinfo`')
-
- return self.ram_base
diff --git a/test/py/u_boot_console_exec_attach.py b/test/py/u_boot_console_exec_attach.py
index 0ca9e7c..19520cb 100644
--- a/test/py/u_boot_console_exec_attach.py
+++ b/test/py/u_boot_console_exec_attach.py
@@ -11,15 +11,15 @@ from u_boot_spawn import Spawn
from u_boot_console_base import ConsoleBase
class ConsoleExecAttach(ConsoleBase):
- '''Represents a physical connection to a U-Boot console, typically via a
+ """Represents a physical connection to a U-Boot console, typically via a
serial port. This implementation executes a sub-process to attach to the
console, expecting that the stdin/out of the sub-process will be forwarded
to/from the physical hardware. This approach isolates the test infra-
structure from the user-/installation-specific details of how to
- communicate with, and the identity of, serial ports etc.'''
+ communicate with, and the identity of, serial ports etc."""
def __init__(self, log, config):
- '''Initialize a U-Boot console connection.
+ """Initialize a U-Boot console connection.
Args:
log: A multiplexed_log.Logfile instance.
@@ -27,7 +27,7 @@ class ConsoleExecAttach(ConsoleBase):
Returns:
Nothing.
- '''
+ """
# The max_fifo_fill value might need tweaking per-board/-SoC?
# 1 would be safe anywhere, but is very slow (a pexpect issue?).
@@ -42,7 +42,7 @@ class ConsoleExecAttach(ConsoleBase):
runner.close()
def get_spawn(self):
- '''Connect to a fresh U-Boot instance.
+ """Connect to a fresh U-Boot instance.
The target board is reset, so that U-Boot begins running from scratch.
@@ -51,7 +51,7 @@ class ConsoleExecAttach(ConsoleBase):
Returns:
A u_boot_spawn.Spawn object that is attached to U-Boot.
- '''
+ """
args = [self.config.board_type, self.config.board_identity]
s = Spawn(['u-boot-test-console'] + args)
diff --git a/test/py/u_boot_console_sandbox.py b/test/py/u_boot_console_sandbox.py
index 88b137e..a7263f3 100644
--- a/test/py/u_boot_console_sandbox.py
+++ b/test/py/u_boot_console_sandbox.py
@@ -10,11 +10,11 @@ from u_boot_spawn import Spawn
from u_boot_console_base import ConsoleBase
class ConsoleSandbox(ConsoleBase):
- '''Represents a connection to a sandbox U-Boot console, executed as a sub-
- process.'''
+ """Represents a connection to a sandbox U-Boot console, executed as a sub-
+ process."""
def __init__(self, log, config):
- '''Initialize a U-Boot console connection.
+ """Initialize a U-Boot console connection.
Args:
log: A multiplexed_log.Logfile instance.
@@ -22,12 +22,12 @@ class ConsoleSandbox(ConsoleBase):
Returns:
Nothing.
- '''
+ """
super(ConsoleSandbox, self).__init__(log, config, max_fifo_fill=1024)
def get_spawn(self):
- '''Connect to a fresh U-Boot instance.
+ """Connect to a fresh U-Boot instance.
A new sandbox process is created, so that U-Boot begins running from
scratch.
@@ -37,26 +37,30 @@ class ConsoleSandbox(ConsoleBase):
Returns:
A u_boot_spawn.Spawn object that is attached to U-Boot.
- '''
+ """
- return Spawn([self.config.build_dir + '/u-boot'])
+ cmd = [
+ self.config.build_dir + '/u-boot',
+ '-d',
+ self.config.build_dir + '/arch/sandbox/dts/test.dtb'
+ ]
+ return Spawn(cmd, cwd=self.config.source_dir)
def kill(self, sig):
- '''Send a specific Unix signal to the sandbox process.
+ """Send a specific Unix signal to the sandbox process.
Args:
sig: The Unix signal to send to the process.
Returns:
Nothing.
- '''
+ """
- self.ensure_spawned()
self.log.action('kill %d' % sig)
self.p.kill(sig)
def validate_exited(self):
- '''Determine whether the sandbox process has exited.
+ """Determine whether the sandbox process has exited.
If required, this function waits a reasonable time for the process to
exit.
@@ -66,7 +70,7 @@ class ConsoleSandbox(ConsoleBase):
Returns:
Boolean indicating whether the process has exited.
- '''
+ """
p = self.p
self.p = None
diff --git a/test/py/u_boot_spawn.py b/test/py/u_boot_spawn.py
index 1baee63..0f52d3e 100644
--- a/test/py/u_boot_spawn.py
+++ b/test/py/u_boot_spawn.py
@@ -12,23 +12,25 @@ import select
import time
class Timeout(Exception):
- '''An exception sub-class that indicates that a timeout occurred.'''
+ """An exception sub-class that indicates that a timeout occurred."""
pass
class Spawn(object):
- '''Represents the stdio of a freshly created sub-process. Commands may be
+ """Represents the stdio of a freshly created sub-process. Commands may be
sent to the process, and responses waited for.
- '''
+ """
- def __init__(self, args):
- '''Spawn (fork/exec) the sub-process.
+ def __init__(self, args, cwd=None):
+ """Spawn (fork/exec) the sub-process.
Args:
- args: array of processs arguments. argv[0] is the command to execute.
+ args: array of processs arguments. argv[0] is the command to
+ execute.
+ cwd: the directory to run the process in, or None for no change.
Returns:
Nothing.
- '''
+ """
self.waited = False
self.buf = ''
@@ -44,6 +46,8 @@ class Spawn(object):
# run under "go" (www.go.cd). Perhaps this happens under any
# background (non-interactive) system?
signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ if cwd:
+ os.chdir(cwd)
os.execvp(args[0], args)
except:
print 'CHILD EXECEPTION:'
@@ -56,26 +60,26 @@ class Spawn(object):
self.poll.register(self.fd, select.POLLIN | select.POLLPRI | select.POLLERR | select.POLLHUP | select.POLLNVAL)
def kill(self, sig):
- '''Send unix signal "sig" to the child process.
+ """Send unix signal "sig" to the child process.
Args:
sig: The signal number to send.
Returns:
Nothing.
- '''
+ """
os.kill(self.pid, sig)
def isalive(self):
- '''Determine whether the child process is still running.
+ """Determine whether the child process is still running.
Args:
None.
Returns:
Boolean indicating whether process is alive.
- '''
+ """
if self.waited:
return False
@@ -88,19 +92,19 @@ class Spawn(object):
return False
def send(self, data):
- '''Send data to the sub-process's stdin.
+ """Send data to the sub-process's stdin.
Args:
data: The data to send to the process.
Returns:
Nothing.
- '''
+ """
os.write(self.fd, data)
def expect(self, patterns):
- '''Wait for the sub-process to emit specific data.
+ """Wait for the sub-process to emit specific data.
This function waits for the process to emit one pattern from the
supplied list of patterns, or for a timeout to occur.
@@ -116,12 +120,13 @@ class Spawn(object):
Notable exceptions:
Timeout, if the process did not emit any of the patterns within
the expected time.
- '''
+ """
for pi in xrange(len(patterns)):
if type(patterns[pi]) == type(''):
patterns[pi] = re.compile(patterns[pi])
+ tstart_s = time.time()
try:
while True:
earliest_m = None
@@ -131,7 +136,7 @@ class Spawn(object):
m = pattern.search(self.buf)
if not m:
continue
- if earliest_m and m.start() > earliest_m.start():
+ if earliest_m and m.start() >= earliest_m.start():
continue
earliest_m = m
earliest_pi = pi
@@ -142,7 +147,11 @@ class Spawn(object):
self.after = self.buf[pos:posafter]
self.buf = self.buf[posafter:]
return earliest_pi
- events = self.poll.poll(self.timeout)
+ tnow_s = time.time()
+ tdelta_ms = (tnow_s - tstart_s) * 1000
+ if tdelta_ms > self.timeout:
+ raise Timeout()
+ events = self.poll.poll(self.timeout - tdelta_ms)
if not events:
raise Timeout()
c = os.read(self.fd, 1024)
@@ -156,7 +165,7 @@ class Spawn(object):
self.logfile_read.flush()
def close(self):
- '''Close the stdio connection to the sub-process.
+ """Close the stdio connection to the sub-process.
This also waits a reasonable time for the sub-process to stop running.
@@ -165,7 +174,7 @@ class Spawn(object):
Returns:
Nothing.
- '''
+ """
os.close(self.fd)
for i in xrange(100):
diff --git a/test/py/u_boot_utils.py b/test/py/u_boot_utils.py
new file mode 100644
index 0000000..72d24e4
--- /dev/null
+++ b/test/py/u_boot_utils.py
@@ -0,0 +1,209 @@
+# Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved.
+#
+# SPDX-License-Identifier: GPL-2.0
+
+# Utility code shared across multiple tests.
+
+import hashlib
+import os
+import os.path
+import sys
+import time
+
+def md5sum_data(data):
+ """Calculate the MD5 hash of some data.
+
+ Args:
+ data: The data to hash.
+
+ Returns:
+ The hash of the data, as a binary string.
+ """
+
+ h = hashlib.md5()
+ h.update(data)
+ return h.digest()
+
+def md5sum_file(fn, max_length=None):
+ """Calculate the MD5 hash of the contents of a file.
+
+ Args:
+ fn: The filename of the file to hash.
+ max_length: The number of bytes to hash. If the file has more
+ bytes than this, they will be ignored. If None or omitted, the
+ entire file will be hashed.
+
+ Returns:
+ The hash of the file content, as a binary string.
+ """
+
+ with open(fn, 'rb') as fh:
+ if max_length:
+ params = [max_length]
+ else:
+ params = []
+ data = fh.read(*params)
+ return md5sum_data(data)
+
+class PersistentRandomFile(object):
+ """Generate and store information about a persistent file containing
+ random data."""
+
+ def __init__(self, u_boot_console, fn, size):
+ """Create or process the persistent file.
+
+ If the file does not exist, it is generated.
+
+ If the file does exist, its content is hashed for later comparison.
+
+ These files are always located in the "persistent data directory" of
+ the current test run.
+
+ Args:
+ u_boot_console: A console connection to U-Boot.
+ fn: The filename (without path) to create.
+ size: The desired size of the file in bytes.
+
+ Returns:
+ Nothing.
+ """
+
+ self.fn = fn
+
+ self.abs_fn = u_boot_console.config.persistent_data_dir + '/' + fn
+
+ if os.path.exists(self.abs_fn):
+ u_boot_console.log.action('Persistent data file ' + self.abs_fn +
+ ' already exists')
+ self.content_hash = md5sum_file(self.abs_fn)
+ else:
+ u_boot_console.log.action('Generating ' + self.abs_fn +
+ ' (random, persistent, %d bytes)' % size)
+ data = os.urandom(size)
+ with open(self.abs_fn, 'wb') as fh:
+ fh.write(data)
+ self.content_hash = md5sum_data(data)
+
+def attempt_to_open_file(fn):
+ """Attempt to open a file, without throwing exceptions.
+
+ Any errors (exceptions) that occur during the attempt to open the file
+ are ignored. This is useful in order to test whether a file (in
+ particular, a device node) exists and can be successfully opened, in order
+ to poll for e.g. USB enumeration completion.
+
+ Args:
+ fn: The filename to attempt to open.
+
+ Returns:
+ An open file handle to the file, or None if the file could not be
+ opened.
+ """
+
+ try:
+ return open(fn, 'rb')
+ except:
+ return None
+
+def wait_until_open_succeeds(fn):
+ """Poll until a file can be opened, or a timeout occurs.
+
+ Continually attempt to open a file, and return when this succeeds, or
+ raise an exception after a timeout.
+
+ Args:
+ fn: The filename to attempt to open.
+
+ Returns:
+ An open file handle to the file.
+ """
+
+ for i in xrange(100):
+ fh = attempt_to_open_file(fn)
+ if fh:
+ return fh
+ time.sleep(0.1)
+ raise Exception('File could not be opened')
+
+def wait_until_file_open_fails(fn, ignore_errors):
+ """Poll until a file cannot be opened, or a timeout occurs.
+
+ Continually attempt to open a file, and return when this fails, or
+ raise an exception after a timeout.
+
+ Args:
+ fn: The filename to attempt to open.
+ ignore_errors: Indicate whether to ignore timeout errors. If True, the
+ function will simply return if a timeout occurs, otherwise an
+ exception will be raised.
+
+ Returns:
+ Nothing.
+ """
+
+ for i in xrange(100):
+ fh = attempt_to_open_file(fn)
+ if not fh:
+ return
+ fh.close()
+ time.sleep(0.1)
+ if ignore_errors:
+ return
+ raise Exception('File can still be opened')
+
+def run_and_log(u_boot_console, cmd, ignore_errors=False):
+ """Run a command and log its output.
+
+ Args:
+ u_boot_console: A console connection to U-Boot.
+ cmd: The command to run, as an array of argv[].
+ ignore_errors: Indicate whether to ignore errors. If True, the function
+ will simply return if the command cannot be executed or exits with
+ an error code, otherwise an exception will be raised if such
+ problems occur.
+
+ Returns:
+ Nothing.
+ """
+
+ runner = u_boot_console.log.get_runner(cmd[0], sys.stdout)
+ runner.run(cmd, ignore_errors=ignore_errors)
+ runner.close()
+
+ram_base = None
+def find_ram_base(u_boot_console):
+ """Find the running U-Boot's RAM location.
+
+ Probe the running U-Boot to determine the address of the first bank
+ of RAM. This is useful for tests that test reading/writing RAM, or
+ load/save files that aren't associated with some standard address
+ typically represented in an environment variable such as
+ ${kernel_addr_r}. The value is cached so that it only needs to be
+ actively read once.
+
+ Args:
+ u_boot_console: A console connection to U-Boot.
+
+ Returns:
+ The address of U-Boot's first RAM bank, as an integer.
+ """
+
+ global ram_base
+ if u_boot_console.config.buildconfig.get('config_cmd_bdi', 'n') != 'y':
+ pytest.skip('bdinfo command not supported')
+ if ram_base == -1:
+ pytest.skip('Previously failed to find RAM bank start')
+ if ram_base is not None:
+ return ram_base
+
+ with u_boot_console.log.section('find_ram_base'):
+ response = u_boot_console.run_command('bdinfo')
+ for l in response.split('\n'):
+ if '-> start' in l:
+ ram_base = int(l.split('=')[1].strip(), 16)
+ break
+ if ram_base is None:
+ ram_base = -1
+ raise Exception('Failed to find RAM bank start in `bdinfo`')
+
+ return ram_base