815 lines
23 KiB
Python
815 lines
23 KiB
Python
# SPDX-License-Identifier: GPL-2.0
|
|
#
|
|
# Parses KTAP test results from a kernel dmesg log and incrementally prints
|
|
# results with reader-friendly format. Stores and returns test results in a
|
|
# Test object.
|
|
#
|
|
# Copyright (C) 2019, Google LLC.
|
|
# Author: Felix Guo <felixguoxiuping@gmail.com>
|
|
# Author: Brendan Higgins <brendanhiggins@google.com>
|
|
# Author: Rae Moar <rmoar@google.com>
|
|
|
|
from __future__ import annotations
|
|
from dataclasses import dataclass
|
|
import re
|
|
import textwrap
|
|
|
|
from enum import Enum, auto
|
|
from typing import Iterable, Iterator, List, Optional, Tuple
|
|
|
|
from kunit_printer import stdout
|
|
|
|
class Test:
|
|
"""
|
|
A class to represent a test parsed from KTAP results. All KTAP
|
|
results within a test log are stored in a main Test object as
|
|
subtests.
|
|
|
|
Attributes:
|
|
status : TestStatus - status of the test
|
|
name : str - name of the test
|
|
expected_count : int - expected number of subtests (0 if single
|
|
test case and None if unknown expected number of subtests)
|
|
subtests : List[Test] - list of subtests
|
|
log : List[str] - log of KTAP lines that correspond to the test
|
|
counts : TestCounts - counts of the test statuses and errors of
|
|
subtests or of the test itself if the test is a single
|
|
test case.
|
|
"""
|
|
def __init__(self) -> None:
|
|
"""Creates Test object with default attributes."""
|
|
self.status = TestStatus.TEST_CRASHED
|
|
self.name = ''
|
|
self.expected_count = 0 # type: Optional[int]
|
|
self.subtests = [] # type: List[Test]
|
|
self.log = [] # type: List[str]
|
|
self.counts = TestCounts()
|
|
|
|
def __str__(self) -> str:
|
|
"""Returns string representation of a Test class object."""
|
|
return (f'Test({self.status}, {self.name}, {self.expected_count}, '
|
|
f'{self.subtests}, {self.log}, {self.counts})')
|
|
|
|
def __repr__(self) -> str:
|
|
"""Returns string representation of a Test class object."""
|
|
return str(self)
|
|
|
|
def add_error(self, error_message: str) -> None:
|
|
"""Records an error that occurred while parsing this test."""
|
|
self.counts.errors += 1
|
|
stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
|
|
|
|
def ok_status(self) -> bool:
|
|
"""Returns true if the status was ok, i.e. passed or skipped."""
|
|
return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
|
|
|
|
class TestStatus(Enum):
|
|
"""An enumeration class to represent the status of a test."""
|
|
SUCCESS = auto()
|
|
FAILURE = auto()
|
|
SKIPPED = auto()
|
|
TEST_CRASHED = auto()
|
|
NO_TESTS = auto()
|
|
FAILURE_TO_PARSE_TESTS = auto()
|
|
|
|
@dataclass
|
|
class TestCounts:
|
|
"""
|
|
Tracks the counts of statuses of all test cases and any errors within
|
|
a Test.
|
|
"""
|
|
passed: int = 0
|
|
failed: int = 0
|
|
crashed: int = 0
|
|
skipped: int = 0
|
|
errors: int = 0
|
|
|
|
def __str__(self) -> str:
|
|
"""Returns the string representation of a TestCounts object."""
|
|
statuses = [('passed', self.passed), ('failed', self.failed),
|
|
('crashed', self.crashed), ('skipped', self.skipped),
|
|
('errors', self.errors)]
|
|
return f'Ran {self.total()} tests: ' + \
|
|
', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
|
|
|
|
def total(self) -> int:
|
|
"""Returns the total number of test cases within a test
|
|
object, where a test case is a test with no subtests.
|
|
"""
|
|
return (self.passed + self.failed + self.crashed +
|
|
self.skipped)
|
|
|
|
def add_subtest_counts(self, counts: TestCounts) -> None:
|
|
"""
|
|
Adds the counts of another TestCounts object to the current
|
|
TestCounts object. Used to add the counts of a subtest to the
|
|
parent test.
|
|
|
|
Parameters:
|
|
counts - a different TestCounts object whose counts
|
|
will be added to the counts of the TestCounts object
|
|
"""
|
|
self.passed += counts.passed
|
|
self.failed += counts.failed
|
|
self.crashed += counts.crashed
|
|
self.skipped += counts.skipped
|
|
self.errors += counts.errors
|
|
|
|
def get_status(self) -> TestStatus:
|
|
"""Returns the aggregated status of a Test using test
|
|
counts.
|
|
"""
|
|
if self.total() == 0:
|
|
return TestStatus.NO_TESTS
|
|
if self.crashed:
|
|
# Crashes should take priority.
|
|
return TestStatus.TEST_CRASHED
|
|
if self.failed:
|
|
return TestStatus.FAILURE
|
|
if self.passed:
|
|
# No failures or crashes, looks good!
|
|
return TestStatus.SUCCESS
|
|
# We have only skipped tests.
|
|
return TestStatus.SKIPPED
|
|
|
|
def add_status(self, status: TestStatus) -> None:
|
|
"""Increments the count for `status`."""
|
|
if status == TestStatus.SUCCESS:
|
|
self.passed += 1
|
|
elif status == TestStatus.FAILURE:
|
|
self.failed += 1
|
|
elif status == TestStatus.SKIPPED:
|
|
self.skipped += 1
|
|
elif status != TestStatus.NO_TESTS:
|
|
self.crashed += 1
|
|
|
|
class LineStream:
|
|
"""
|
|
A class to represent the lines of kernel output.
|
|
Provides a lazy peek()/pop() interface over an iterator of
|
|
(line#, text).
|
|
"""
|
|
_lines: Iterator[Tuple[int, str]]
|
|
_next: Tuple[int, str]
|
|
_need_next: bool
|
|
_done: bool
|
|
|
|
def __init__(self, lines: Iterator[Tuple[int, str]]):
|
|
"""Creates a new LineStream that wraps the given iterator."""
|
|
self._lines = lines
|
|
self._done = False
|
|
self._need_next = True
|
|
self._next = (0, '')
|
|
|
|
def _get_next(self) -> None:
|
|
"""Advances the LineSteam to the next line, if necessary."""
|
|
if not self._need_next:
|
|
return
|
|
try:
|
|
self._next = next(self._lines)
|
|
except StopIteration:
|
|
self._done = True
|
|
finally:
|
|
self._need_next = False
|
|
|
|
def peek(self) -> str:
|
|
"""Returns the current line, without advancing the LineStream.
|
|
"""
|
|
self._get_next()
|
|
return self._next[1]
|
|
|
|
def pop(self) -> str:
|
|
"""Returns the current line and advances the LineStream to
|
|
the next line.
|
|
"""
|
|
s = self.peek()
|
|
if self._done:
|
|
raise ValueError(f'LineStream: going past EOF, last line was {s}')
|
|
self._need_next = True
|
|
return s
|
|
|
|
def __bool__(self) -> bool:
|
|
"""Returns True if stream has more lines."""
|
|
self._get_next()
|
|
return not self._done
|
|
|
|
# Only used by kunit_tool_test.py.
|
|
def __iter__(self) -> Iterator[str]:
|
|
"""Empties all lines stored in LineStream object into
|
|
Iterator object and returns the Iterator object.
|
|
"""
|
|
while bool(self):
|
|
yield self.pop()
|
|
|
|
def line_number(self) -> int:
|
|
"""Returns the line number of the current line."""
|
|
self._get_next()
|
|
return self._next[0]
|
|
|
|
# Parsing helper methods:
|
|
|
|
KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
|
|
TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
|
|
KTAP_END = re.compile(r'\s*(List of all partitions:|'
|
|
'Kernel panic - not syncing: VFS:|reboot: System halted)')
|
|
|
|
def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
|
|
"""Extracts KTAP lines from the kernel output."""
|
|
def isolate_ktap_output(kernel_output: Iterable[str]) \
|
|
-> Iterator[Tuple[int, str]]:
|
|
line_num = 0
|
|
started = False
|
|
for line in kernel_output:
|
|
line_num += 1
|
|
line = line.rstrip() # remove trailing \n
|
|
if not started and KTAP_START.search(line):
|
|
# start extracting KTAP lines and set prefix
|
|
# to number of characters before version line
|
|
prefix_len = len(
|
|
line.split('KTAP version')[0])
|
|
started = True
|
|
yield line_num, line[prefix_len:]
|
|
elif not started and TAP_START.search(line):
|
|
# start extracting KTAP lines and set prefix
|
|
# to number of characters before version line
|
|
prefix_len = len(line.split('TAP version')[0])
|
|
started = True
|
|
yield line_num, line[prefix_len:]
|
|
elif started and KTAP_END.search(line):
|
|
# stop extracting KTAP lines
|
|
break
|
|
elif started:
|
|
# remove the prefix, if any.
|
|
line = line[prefix_len:]
|
|
yield line_num, line
|
|
return LineStream(lines=isolate_ktap_output(kernel_output))
|
|
|
|
KTAP_VERSIONS = [1]
|
|
TAP_VERSIONS = [13, 14]
|
|
|
|
def check_version(version_num: int, accepted_versions: List[int],
|
|
version_type: str, test: Test) -> None:
|
|
"""
|
|
Adds error to test object if version number is too high or too
|
|
low.
|
|
|
|
Parameters:
|
|
version_num - The inputted version number from the parsed KTAP or TAP
|
|
header line
|
|
accepted_version - List of accepted KTAP or TAP versions
|
|
version_type - 'KTAP' or 'TAP' depending on the type of
|
|
version line.
|
|
test - Test object for current test being parsed
|
|
"""
|
|
if version_num < min(accepted_versions):
|
|
test.add_error(f'{version_type} version lower than expected!')
|
|
elif version_num > max(accepted_versions):
|
|
test.add_error(f'{version_type} version higer than expected!')
|
|
|
|
def parse_ktap_header(lines: LineStream, test: Test) -> bool:
|
|
"""
|
|
Parses KTAP/TAP header line and checks version number.
|
|
Returns False if fails to parse KTAP/TAP header line.
|
|
|
|
Accepted formats:
|
|
- 'KTAP version [version number]'
|
|
- 'TAP version [version number]'
|
|
|
|
Parameters:
|
|
lines - LineStream of KTAP output to parse
|
|
test - Test object for current test being parsed
|
|
|
|
Return:
|
|
True if successfully parsed KTAP/TAP header line
|
|
"""
|
|
ktap_match = KTAP_START.match(lines.peek())
|
|
tap_match = TAP_START.match(lines.peek())
|
|
if ktap_match:
|
|
version_num = int(ktap_match.group(1))
|
|
check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
|
|
elif tap_match:
|
|
version_num = int(tap_match.group(1))
|
|
check_version(version_num, TAP_VERSIONS, 'TAP', test)
|
|
else:
|
|
return False
|
|
lines.pop()
|
|
return True
|
|
|
|
TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
|
|
|
|
def parse_test_header(lines: LineStream, test: Test) -> bool:
|
|
"""
|
|
Parses test header and stores test name in test object.
|
|
Returns False if fails to parse test header line.
|
|
|
|
Accepted format:
|
|
- '# Subtest: [test name]'
|
|
|
|
Parameters:
|
|
lines - LineStream of KTAP output to parse
|
|
test - Test object for current test being parsed
|
|
|
|
Return:
|
|
True if successfully parsed test header line
|
|
"""
|
|
match = TEST_HEADER.match(lines.peek())
|
|
if not match:
|
|
return False
|
|
test.name = match.group(1)
|
|
lines.pop()
|
|
return True
|
|
|
|
TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
|
|
|
|
def parse_test_plan(lines: LineStream, test: Test) -> bool:
|
|
"""
|
|
Parses test plan line and stores the expected number of subtests in
|
|
test object. Reports an error if expected count is 0.
|
|
Returns False and sets expected_count to None if there is no valid test
|
|
plan.
|
|
|
|
Accepted format:
|
|
- '1..[number of subtests]'
|
|
|
|
Parameters:
|
|
lines - LineStream of KTAP output to parse
|
|
test - Test object for current test being parsed
|
|
|
|
Return:
|
|
True if successfully parsed test plan line
|
|
"""
|
|
match = TEST_PLAN.match(lines.peek())
|
|
if not match:
|
|
test.expected_count = None
|
|
return False
|
|
expected_count = int(match.group(1))
|
|
test.expected_count = expected_count
|
|
lines.pop()
|
|
return True
|
|
|
|
TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
|
|
|
|
TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
|
|
|
|
def peek_test_name_match(lines: LineStream, test: Test) -> bool:
|
|
"""
|
|
Matches current line with the format of a test result line and checks
|
|
if the name matches the name of the current test.
|
|
Returns False if fails to match format or name.
|
|
|
|
Accepted format:
|
|
- '[ok|not ok] [test number] [-] [test name] [optional skip
|
|
directive]'
|
|
|
|
Parameters:
|
|
lines - LineStream of KTAP output to parse
|
|
test - Test object for current test being parsed
|
|
|
|
Return:
|
|
True if matched a test result line and the name matching the
|
|
expected test name
|
|
"""
|
|
line = lines.peek()
|
|
match = TEST_RESULT.match(line)
|
|
if not match:
|
|
return False
|
|
name = match.group(4)
|
|
return name == test.name
|
|
|
|
def parse_test_result(lines: LineStream, test: Test,
|
|
expected_num: int) -> bool:
|
|
"""
|
|
Parses test result line and stores the status and name in the test
|
|
object. Reports an error if the test number does not match expected
|
|
test number.
|
|
Returns False if fails to parse test result line.
|
|
|
|
Note that the SKIP directive is the only direction that causes a
|
|
change in status.
|
|
|
|
Accepted format:
|
|
- '[ok|not ok] [test number] [-] [test name] [optional skip
|
|
directive]'
|
|
|
|
Parameters:
|
|
lines - LineStream of KTAP output to parse
|
|
test - Test object for current test being parsed
|
|
expected_num - expected test number for current test
|
|
|
|
Return:
|
|
True if successfully parsed a test result line.
|
|
"""
|
|
line = lines.peek()
|
|
match = TEST_RESULT.match(line)
|
|
skip_match = TEST_RESULT_SKIP.match(line)
|
|
|
|
# Check if line matches test result line format
|
|
if not match:
|
|
return False
|
|
lines.pop()
|
|
|
|
# Set name of test object
|
|
if skip_match:
|
|
test.name = skip_match.group(4)
|
|
else:
|
|
test.name = match.group(4)
|
|
|
|
# Check test num
|
|
num = int(match.group(2))
|
|
if num != expected_num:
|
|
test.add_error(f'Expected test number {expected_num} but found {num}')
|
|
|
|
# Set status of test object
|
|
status = match.group(1)
|
|
if skip_match:
|
|
test.status = TestStatus.SKIPPED
|
|
elif status == 'ok':
|
|
test.status = TestStatus.SUCCESS
|
|
else:
|
|
test.status = TestStatus.FAILURE
|
|
return True
|
|
|
|
def parse_diagnostic(lines: LineStream) -> List[str]:
|
|
"""
|
|
Parse lines that do not match the format of a test result line or
|
|
test header line and returns them in list.
|
|
|
|
Line formats that are not parsed:
|
|
- '# Subtest: [test name]'
|
|
- '[ok|not ok] [test number] [-] [test name] [optional skip
|
|
directive]'
|
|
- 'KTAP version [version number]'
|
|
|
|
Parameters:
|
|
lines - LineStream of KTAP output to parse
|
|
|
|
Return:
|
|
Log of diagnostic lines
|
|
"""
|
|
log = [] # type: List[str]
|
|
non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START]
|
|
while lines and not any(re.match(lines.peek())
|
|
for re in non_diagnostic_lines):
|
|
log.append(lines.pop())
|
|
return log
|
|
|
|
|
|
# Printing helper methods:
|
|
|
|
DIVIDER = '=' * 60
|
|
|
|
def format_test_divider(message: str, len_message: int) -> str:
|
|
"""
|
|
Returns string with message centered in fixed width divider.
|
|
|
|
Example:
|
|
'===================== message example ====================='
|
|
|
|
Parameters:
|
|
message - message to be centered in divider line
|
|
len_message - length of the message to be printed such that
|
|
any characters of the color codes are not counted
|
|
|
|
Return:
|
|
String containing message centered in fixed width divider
|
|
"""
|
|
default_count = 3 # default number of dashes
|
|
len_1 = default_count
|
|
len_2 = default_count
|
|
difference = len(DIVIDER) - len_message - 2 # 2 spaces added
|
|
if difference > 0:
|
|
# calculate number of dashes for each side of the divider
|
|
len_1 = int(difference / 2)
|
|
len_2 = difference - len_1
|
|
return ('=' * len_1) + f' {message} ' + ('=' * len_2)
|
|
|
|
def print_test_header(test: Test) -> None:
|
|
"""
|
|
Prints test header with test name and optionally the expected number
|
|
of subtests.
|
|
|
|
Example:
|
|
'=================== example (2 subtests) ==================='
|
|
|
|
Parameters:
|
|
test - Test object representing current test being printed
|
|
"""
|
|
message = test.name
|
|
if message != "":
|
|
# Add a leading space before the subtest counts only if a test name
|
|
# is provided using a "# Subtest" header line.
|
|
message += " "
|
|
if test.expected_count:
|
|
if test.expected_count == 1:
|
|
message += '(1 subtest)'
|
|
else:
|
|
message += f'({test.expected_count} subtests)'
|
|
stdout.print_with_timestamp(format_test_divider(message, len(message)))
|
|
|
|
def print_log(log: Iterable[str]) -> None:
|
|
"""Prints all strings in saved log for test in yellow."""
|
|
formatted = textwrap.dedent('\n'.join(log))
|
|
for line in formatted.splitlines():
|
|
stdout.print_with_timestamp(stdout.yellow(line))
|
|
|
|
def format_test_result(test: Test) -> str:
|
|
"""
|
|
Returns string with formatted test result with colored status and test
|
|
name.
|
|
|
|
Example:
|
|
'[PASSED] example'
|
|
|
|
Parameters:
|
|
test - Test object representing current test being printed
|
|
|
|
Return:
|
|
String containing formatted test result
|
|
"""
|
|
if test.status == TestStatus.SUCCESS:
|
|
return stdout.green('[PASSED] ') + test.name
|
|
if test.status == TestStatus.SKIPPED:
|
|
return stdout.yellow('[SKIPPED] ') + test.name
|
|
if test.status == TestStatus.NO_TESTS:
|
|
return stdout.yellow('[NO TESTS RUN] ') + test.name
|
|
if test.status == TestStatus.TEST_CRASHED:
|
|
print_log(test.log)
|
|
return stdout.red('[CRASHED] ') + test.name
|
|
print_log(test.log)
|
|
return stdout.red('[FAILED] ') + test.name
|
|
|
|
def print_test_result(test: Test) -> None:
|
|
"""
|
|
Prints result line with status of test.
|
|
|
|
Example:
|
|
'[PASSED] example'
|
|
|
|
Parameters:
|
|
test - Test object representing current test being printed
|
|
"""
|
|
stdout.print_with_timestamp(format_test_result(test))
|
|
|
|
def print_test_footer(test: Test) -> None:
|
|
"""
|
|
Prints test footer with status of test.
|
|
|
|
Example:
|
|
'===================== [PASSED] example ====================='
|
|
|
|
Parameters:
|
|
test - Test object representing current test being printed
|
|
"""
|
|
message = format_test_result(test)
|
|
stdout.print_with_timestamp(format_test_divider(message,
|
|
len(message) - stdout.color_len()))
|
|
|
|
|
|
|
|
def _summarize_failed_tests(test: Test) -> str:
|
|
"""Tries to summarize all the failing subtests in `test`."""
|
|
|
|
def failed_names(test: Test, parent_name: str) -> List[str]:
|
|
# Note: we use 'main' internally for the top-level test.
|
|
if not parent_name or parent_name == 'main':
|
|
full_name = test.name
|
|
else:
|
|
full_name = parent_name + '.' + test.name
|
|
|
|
if not test.subtests: # this is a leaf node
|
|
return [full_name]
|
|
|
|
# If all the children failed, just say this subtest failed.
|
|
# Don't summarize it down "the top-level test failed", though.
|
|
failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
|
|
if parent_name and len(failed_subtests) == len(test.subtests):
|
|
return [full_name]
|
|
|
|
all_failures = [] # type: List[str]
|
|
for t in failed_subtests:
|
|
all_failures.extend(failed_names(t, full_name))
|
|
return all_failures
|
|
|
|
failures = failed_names(test, '')
|
|
# If there are too many failures, printing them out will just be noisy.
|
|
if len(failures) > 10: # this is an arbitrary limit
|
|
return ''
|
|
|
|
return 'Failures: ' + ', '.join(failures)
|
|
|
|
|
|
def print_summary_line(test: Test) -> None:
|
|
"""
|
|
Prints summary line of test object. Color of line is dependent on
|
|
status of test. Color is green if test passes, yellow if test is
|
|
skipped, and red if the test fails or crashes. Summary line contains
|
|
counts of the statuses of the tests subtests or the test itself if it
|
|
has no subtests.
|
|
|
|
Example:
|
|
"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
|
|
Errors: 0"
|
|
|
|
test - Test object representing current test being printed
|
|
"""
|
|
if test.status == TestStatus.SUCCESS:
|
|
color = stdout.green
|
|
elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
|
|
color = stdout.yellow
|
|
else:
|
|
color = stdout.red
|
|
stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
|
|
|
|
# Summarize failures that might have gone off-screen since we had a lot
|
|
# of tests (arbitrarily defined as >=100 for now).
|
|
if test.ok_status() or test.counts.total() < 100:
|
|
return
|
|
summarized = _summarize_failed_tests(test)
|
|
if not summarized:
|
|
return
|
|
stdout.print_with_timestamp(color(summarized))
|
|
|
|
# Other methods:
|
|
|
|
def bubble_up_test_results(test: Test) -> None:
|
|
"""
|
|
If the test has subtests, add the test counts of the subtests to the
|
|
test and check if any of the tests crashed and if so set the test
|
|
status to crashed. Otherwise if the test has no subtests add the
|
|
status of the test to the test counts.
|
|
|
|
Parameters:
|
|
test - Test object for current test being parsed
|
|
"""
|
|
subtests = test.subtests
|
|
counts = test.counts
|
|
status = test.status
|
|
for t in subtests:
|
|
counts.add_subtest_counts(t.counts)
|
|
if counts.total() == 0:
|
|
counts.add_status(status)
|
|
elif test.counts.get_status() == TestStatus.TEST_CRASHED:
|
|
test.status = TestStatus.TEST_CRASHED
|
|
|
|
def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
|
|
"""
|
|
Finds next test to parse in LineStream, creates new Test object,
|
|
parses any subtests of the test, populates Test object with all
|
|
information (status, name) about the test and the Test objects for
|
|
any subtests, and then returns the Test object. The method accepts
|
|
three formats of tests:
|
|
|
|
Accepted test formats:
|
|
|
|
- Main KTAP/TAP header
|
|
|
|
Example:
|
|
|
|
KTAP version 1
|
|
1..4
|
|
[subtests]
|
|
|
|
- Subtest header (must include either the KTAP version line or
|
|
"# Subtest" header line)
|
|
|
|
Example (preferred format with both KTAP version line and
|
|
"# Subtest" line):
|
|
|
|
KTAP version 1
|
|
# Subtest: name
|
|
1..3
|
|
[subtests]
|
|
ok 1 name
|
|
|
|
Example (only "# Subtest" line):
|
|
|
|
# Subtest: name
|
|
1..3
|
|
[subtests]
|
|
ok 1 name
|
|
|
|
Example (only KTAP version line, compliant with KTAP v1 spec):
|
|
|
|
KTAP version 1
|
|
1..3
|
|
[subtests]
|
|
ok 1 name
|
|
|
|
- Test result line
|
|
|
|
Example:
|
|
|
|
ok 1 - test
|
|
|
|
Parameters:
|
|
lines - LineStream of KTAP output to parse
|
|
expected_num - expected test number for test to be parsed
|
|
log - list of strings containing any preceding diagnostic lines
|
|
corresponding to the current test
|
|
is_subtest - boolean indicating whether test is a subtest
|
|
|
|
Return:
|
|
Test object populated with characteristics and any subtests
|
|
"""
|
|
test = Test()
|
|
test.log.extend(log)
|
|
if not is_subtest:
|
|
# If parsing the main/top-level test, parse KTAP version line and
|
|
# test plan
|
|
test.name = "main"
|
|
ktap_line = parse_ktap_header(lines, test)
|
|
parse_test_plan(lines, test)
|
|
parent_test = True
|
|
else:
|
|
# If not the main test, attempt to parse a test header containing
|
|
# the KTAP version line and/or subtest header line
|
|
ktap_line = parse_ktap_header(lines, test)
|
|
subtest_line = parse_test_header(lines, test)
|
|
parent_test = (ktap_line or subtest_line)
|
|
if parent_test:
|
|
# If KTAP version line and/or subtest header is found, attempt
|
|
# to parse test plan and print test header
|
|
parse_test_plan(lines, test)
|
|
print_test_header(test)
|
|
expected_count = test.expected_count
|
|
subtests = []
|
|
test_num = 1
|
|
while parent_test and (expected_count is None or test_num <= expected_count):
|
|
# Loop to parse any subtests.
|
|
# Break after parsing expected number of tests or
|
|
# if expected number of tests is unknown break when test
|
|
# result line with matching name to subtest header is found
|
|
# or no more lines in stream.
|
|
sub_log = parse_diagnostic(lines)
|
|
sub_test = Test()
|
|
if not lines or (peek_test_name_match(lines, test) and
|
|
is_subtest):
|
|
if expected_count and test_num <= expected_count:
|
|
# If parser reaches end of test before
|
|
# parsing expected number of subtests, print
|
|
# crashed subtest and record error
|
|
test.add_error('missing expected subtest!')
|
|
sub_test.log.extend(sub_log)
|
|
test.counts.add_status(
|
|
TestStatus.TEST_CRASHED)
|
|
print_test_result(sub_test)
|
|
else:
|
|
test.log.extend(sub_log)
|
|
break
|
|
else:
|
|
sub_test = parse_test(lines, test_num, sub_log, True)
|
|
subtests.append(sub_test)
|
|
test_num += 1
|
|
test.subtests = subtests
|
|
if is_subtest:
|
|
# If not main test, look for test result line
|
|
test.log.extend(parse_diagnostic(lines))
|
|
if test.name != "" and not peek_test_name_match(lines, test):
|
|
test.add_error('missing subtest result line!')
|
|
else:
|
|
parse_test_result(lines, test, expected_num)
|
|
|
|
# Check for there being no subtests within parent test
|
|
if parent_test and len(subtests) == 0:
|
|
# Don't override a bad status if this test had one reported.
|
|
# Assumption: no subtests means CRASHED is from Test.__init__()
|
|
if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
|
|
test.status = TestStatus.NO_TESTS
|
|
test.add_error('0 tests run!')
|
|
|
|
# Add statuses to TestCounts attribute in Test object
|
|
bubble_up_test_results(test)
|
|
if parent_test and is_subtest:
|
|
# If test has subtests and is not the main test object, print
|
|
# footer.
|
|
print_test_footer(test)
|
|
elif is_subtest:
|
|
print_test_result(test)
|
|
return test
|
|
|
|
def parse_run_tests(kernel_output: Iterable[str]) -> Test:
|
|
"""
|
|
Using kernel output, extract KTAP lines, parse the lines for test
|
|
results and print condensed test results and summary line.
|
|
|
|
Parameters:
|
|
kernel_output - Iterable object contains lines of kernel output
|
|
|
|
Return:
|
|
Test - the main test object with all subtests.
|
|
"""
|
|
stdout.print_with_timestamp(DIVIDER)
|
|
lines = extract_tap_lines(kernel_output)
|
|
test = Test()
|
|
if not lines:
|
|
test.name = '<missing>'
|
|
test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
|
|
test.status = TestStatus.FAILURE_TO_PARSE_TESTS
|
|
else:
|
|
test = parse_test(lines, 0, [], False)
|
|
if test.status != TestStatus.NO_TESTS:
|
|
test.status = test.counts.get_status()
|
|
stdout.print_with_timestamp(DIVIDER)
|
|
print_summary_line(test)
|
|
return test
|