3 Copyright (c) 2011-2016 ARM Limited
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
25 from sre_compile
import error
26 if (sys.version_info > (3, 0)):
27 from queue
import Empty
as QueueEmpty
29 from Queue
import Empty
as QueueEmpty
31 from mbed_host_tests
import BaseHostTest
32 from multiprocessing
import Process, Queue, Lock
33 from mbed_host_tests
import host_tests_plugins
34 from ..host_tests_registry
import HostRegistry
52 """! Select default host_test supervision (replaced after auto detection) """
53 RESET_TYPE_SW_RST =
"software_reset"
54 RESET_TYPE_HW_RST =
"hardware_reset"
75 if options.enum_host_tests:
76 for path
in options.enum_host_tests:
77 self.
registryregistry.register_from_path(
78 path, verbose=options.verbose
81 if options.list_reg_hts:
82 print(self.
registryregistry.table(options.verbose))
85 if options.list_plugins:
86 host_tests_plugins.print_plugin_info()
91 version = pkg_resources.require(
"mbed-host-tests")[0].version
95 if options.send_break_cmd:
98 reset_type=options.forced_reset_type,
99 baudrate=options.baud_rate,
100 verbose=options.verbose)
103 if options.global_resource_mgr
or options.fast_model_connection:
106 self.
optionsoptions.skip_reset =
True
107 self.
optionsoptions.skip_flashing =
True
109 if options.compare_log:
110 with open(options.compare_log,
"r")
as f:
117 DefaultTestSelectorBase.__init__(self, options)
120 """! Check if host test object loaded is actually host test class
121 derived from 'mbed_host_tests.BaseHostTest()'
122 Additionaly if host test class implements custom ctor it should
123 call BaseHostTest().__Init__()
124 @param obj_instance Instance of host test derived class
125 @return True if obj_instance is derived from mbed_host_tests.BaseHostTest()
126 and BaseHostTest.__init__() was called, else return False
131 self.
loggerlogger.prn_inf(
"host test class: '%s'"% obj_instance.__class__)
134 if not isinstance(obj_instance, BaseHostTest):
137 self.
loggerlogger.prn_err(
"host test must inherit from mbed_host_tests.BaseHostTest() class")
141 if not obj_instance.base_host_test_inited():
142 self.
loggerlogger.prn_err(
"custom host test __init__() must call BaseHostTest.__init__(self)")
148 """! This function implements key-value protocol state-machine.
149 Handling of all events and connector are handled here.
150 @return Return self.TestResults.RESULT_* enum
153 timeout_duration = 10
154 coverage_idle_timeout = 10
155 event_queue = Queue()
156 dut_event_queue = Queue()
158 def callback__notify_prn(key, value, timestamp):
159 """! Handles __norify_prn. Prints all lines in separate log line """
160 for line
in value.splitlines():
161 self.
loggerlogger.prn_inf(line)
164 "__notify_prn" : callback__notify_prn
168 callbacks_consume =
True
170 callbacks__exit =
False
172 callbacks__exit_event_queue =
False
178 self.
loggerlogger.prn_inf(
"starting host test process...")
184 "port" : self.
mbedmbed.port,
185 "baudrate" : self.
mbedmbed.serial_baud,
186 "program_cycle_s" : self.
optionsoptions.program_cycle_s,
187 "reset_type" : self.
optionsoptions.forced_reset_type,
188 "target_id" : self.
optionsoptions.target_id,
189 "disk" : self.
optionsoptions.disk,
190 "polling_timeout" : self.
optionsoptions.polling_timeout,
191 "forced_reset_timeout" : self.
optionsoptions.forced_reset_timeout,
192 "sync_behavior" : self.
optionsoptions.sync_behavior,
193 "platform_name" : self.
optionsoptions.micro,
194 "image_path" : self.
mbedmbed.image_path,
195 "skip_reset": self.
optionsoptions.skip_reset,
196 "tags" : self.
optionsoptions.tag_filters,
197 "sync_timeout": self.
optionsoptions.sync_timeout
200 if self.
optionsoptions.global_resource_mgr:
201 grm_module, grm_host, grm_port = self.
optionsoptions.global_resource_mgr.split(
':')
204 "conn_resource" :
'grm',
205 "grm_module" : grm_module,
206 "grm_host" : grm_host,
207 "grm_port" : grm_port,
210 if self.
optionsoptions.fast_model_connection:
213 "conn_resource" :
'fmc',
214 "fm_config" : self.
optionsoptions.fast_model_connection
217 def start_conn_process():
219 args = (event_queue, dut_event_queue, config)
220 p = Process(target=conn_process, args=args)
225 def process_code_coverage(key, value, timestamp):
226 """! Process the found coverage key value and perform an idle
227 loop checking for more timeing out if there is no response from
228 the target within the idle timeout.
229 @param key The key from the first coverage event
230 @param value The value from the first coverage event
231 @param timestamp The timestamp from the first coverage event
232 @return The elapsed time taken by the processing of code coverage,
233 and the (key, value, and timestamp) of the next event
235 original_start_time = time()
239 callbacks[key](key, value, timestamp)
242 while (time() - start_time) < coverage_idle_timeout:
244 (key, value, timestamp) = event_queue.get(timeout=1)
250 if key ==
'__coverage_start' or key ==
'__rxd_line':
254 callbacks[key](key, value, timestamp)
257 elapsed_time = time() - original_start_time
258 return elapsed_time, (key, value, timestamp)
260 p = start_conn_process()
261 conn_process_started =
False
266 (key, value, timestamp) = event_queue.get(
267 timeout=
None if self.
optionsoptions.global_resource_mgr
else self.
optionsoptions.process_start_timeout)
269 if key ==
'__conn_process_start':
270 conn_process_started =
True
272 self.
loggerlogger.prn_err(
"First expected event was '__conn_process_start', received '%s' instead"% key)
275 self.
loggerlogger.prn_err(
"Conn process failed to start in %f sec"% self.
optionsoptions.process_start_timeout)
277 if not conn_process_started:
279 return self.RESULT_TIMEOUT
284 consume_preamble_events =
True
286 while (time() - start_time) < timeout_duration:
289 (key, value, timestamp) = event_queue.get(timeout=1)
295 if key ==
'__rxd_line':
297 f.write(
"%s\n" % value)
301 if key ==
'__rxd_line':
303 self.
loggerlogger.prn_inf(
"Target log matches compare log!")
307 if consume_preamble_events:
308 if key ==
'__timeout':
311 timeout_duration = int(value)
312 self.
loggerlogger.prn_inf(
"setting timeout to: %d sec"% int(value))
313 elif key ==
'__version':
316 elif key ==
'__host_test_name':
326 self.
test_supervisortest_supervisor.setup_communication(event_queue, dut_event_queue, config)
330 except (TypeError, ValueError):
332 self.
loggerlogger.prn_err(
"host test setup() failed, reason:")
333 self.
loggerlogger.prn_inf(
"==== Traceback start ====")
334 for line
in traceback.format_exc().splitlines():
336 self.
loggerlogger.prn_inf(
"==== Traceback end ====")
337 result = self.RESULT_ERROR
338 event_queue.put((
'__exit_event_queue', 0, time()))
340 self.
loggerlogger.prn_inf(
"host test setup() call...")
343 self.
loggerlogger.prn_inf(
"CALLBACKs updated")
345 self.
loggerlogger.prn_wrn(
"no CALLBACKs specified by host test")
346 self.
loggerlogger.prn_inf(
"host test detected: %s"% value)
348 self.
loggerlogger.prn_err(
"host test not detected: %s"% value)
349 result = self.RESULT_ERROR
350 event_queue.put((
'__exit_event_queue', 0, time()))
352 consume_preamble_events =
False
353 elif key ==
'__sync':
355 self.
loggerlogger.prn_inf(
"sync KV found, uuid=%s, timestamp=%f"% (str(value), timestamp))
356 elif key ==
'__notify_sync_failed':
358 self.
loggerlogger.prn_err(value)
359 self.
loggerlogger.prn_wrn(
"stopped to consume events due to %s event"% key)
360 callbacks_consume =
False
361 result = self.RESULT_SYNC_FAILED
362 event_queue.put((
'__exit_event_queue', 0, time()))
363 elif key ==
'__notify_conn_lost':
365 self.
loggerlogger.prn_err(value)
366 self.
loggerlogger.prn_wrn(
"stopped to consume events due to %s event"% key)
367 callbacks_consume =
False
368 result = self.RESULT_IO_SERIAL
369 event_queue.put((
'__exit_event_queue', 0, time()))
370 elif key ==
'__exit_event_queue':
372 self.
loggerlogger.prn_inf(
"%s received"% (key))
373 callbacks__exit_event_queue =
True
375 elif key.startswith(
'__'):
379 self.
loggerlogger.prn_err(
"orphan event in preamble phase: {{%s;%s}}, timestamp=%f"% (key, str(value), timestamp))
382 if key ==
'__coverage_start':
383 self.
loggerlogger.prn_inf(
"starting coverage idle timeout loop...")
384 elapsed_time, (key, value, timestamp) = process_code_coverage(key, value, timestamp)
387 timeout_duration += elapsed_time
388 self.
loggerlogger.prn_inf(
"exiting coverage idle timeout loop (elapsed_time: %.2f" % elapsed_time)
390 if key ==
'__notify_complete':
393 self.
loggerlogger.prn_inf(
"%s(%s)" % (key, str(value)))
395 event_queue.put((
'__exit_event_queue', 0, time()))
396 elif key ==
'__reset':
398 dut_event_queue.put((
'__reset',
True, time()))
399 elif key ==
'__reset_dut':
401 dut_event_queue.put((
'__host_test_finished',
True, time()))
404 if value == DefaultTestSelector.RESET_TYPE_SW_RST:
405 self.
loggerlogger.prn_inf(
"Performing software reset.")
407 elif value == DefaultTestSelector.RESET_TYPE_HW_RST:
408 self.
loggerlogger.prn_inf(
"Performing hard reset.")
410 self.
mbedmbed.hw_reset()
412 self.
loggerlogger.prn_err(
"Invalid reset type (%s). Supported types [%s]." %
413 (value,
", ".join([DefaultTestSelector.RESET_TYPE_HW_RST,
414 DefaultTestSelector.RESET_TYPE_SW_RST])))
415 self.
loggerlogger.prn_inf(
"Software reset will be performed.")
418 p = start_conn_process()
419 elif key ==
'__notify_conn_lost':
421 self.
loggerlogger.prn_err(value)
422 self.
loggerlogger.prn_wrn(
"stopped to consume events due to %s event"% key)
423 callbacks_consume =
False
424 result = self.RESULT_IO_SERIAL
425 event_queue.put((
'__exit_event_queue', 0, time()))
426 elif key ==
'__exit':
428 self.
loggerlogger.prn_inf(
"%s(%s)"% (key, str(value)))
429 callbacks__exit =
True
430 event_queue.put((
'__exit_event_queue', 0, time()))
431 elif key ==
'__exit_event_queue':
433 self.
loggerlogger.prn_inf(
"%s received"% (key))
434 callbacks__exit_event_queue =
True
436 elif key ==
'__timeout_set':
438 timeout_duration = int(value)
439 self.
loggerlogger.prn_inf(
"setting timeout to: %d sec"% int(value))
440 elif key ==
'__timeout_adjust':
442 timeout_duration = timeout_duration + int(value)
443 self.
loggerlogger.prn_inf(
"adjusting timeout with %d sec (now %d)" % (int(value), timeout_duration))
444 elif key
in callbacks:
446 callbacks[key](key, value, timestamp)
448 self.
loggerlogger.prn_err(
"orphan event in main phase: {{%s;%s}}, timestamp=%f"% (key, str(value), timestamp))
450 self.
loggerlogger.prn_err(
"something went wrong in event main loop!")
451 self.
loggerlogger.prn_inf(
"==== Traceback start ====")
452 for line
in traceback.format_exc().splitlines():
454 self.
loggerlogger.prn_inf(
"==== Traceback end ====")
455 result = self.RESULT_ERROR
457 time_duration = time() - start_time
458 self.
loggerlogger.prn_inf(
"test suite run finished after %.2f sec..."% time_duration)
465 dut_event_queue.put((
'__host_test_finished',
True, time()))
467 self.
loggerlogger.prn_inf(
"CONN exited with code: %s"% str(p.exitcode))
470 self.
loggerlogger.prn_inf(
"No events in queue" if event_queue.empty()
else "Some events in queue")
480 if callbacks_consume:
482 while not event_queue.empty():
484 (key, value, timestamp) = event_queue.get(timeout=1)
488 if key ==
'__notify_complete':
491 self.
loggerlogger.prn_inf(
"%s(%s)"% (key, str(value)))
493 elif key.startswith(
'__'):
496 elif key
in callbacks:
497 callbacks[key](key, value, timestamp)
499 self.
loggerlogger.prn_wrn(
">>> orphan event: {{%s;%s}}, timestamp=%f"% (key, str(value), timestamp))
500 self.
loggerlogger.prn_inf(
"stopped consuming events")
502 if result
is not None:
504 self.
loggerlogger.prn_inf(
"host test result() call skipped, received: %s"% str(result))
508 self.
loggerlogger.prn_inf(
"host test result(): %s"% str(result))
510 if not callbacks__exit:
511 self.
loggerlogger.prn_wrn(
"missing __exit event from DUT")
513 if not callbacks__exit_event_queue:
514 self.
loggerlogger.prn_wrn(
"missing __exit_event_queue event from host test")
517 if not callbacks__exit_event_queue
and result
is None:
518 self.
loggerlogger.prn_err(
"missing __exit_event_queue event from " + \
519 "host test and no result from host test, timeout...")
520 result = self.RESULT_TIMEOUT
522 self.
loggerlogger.prn_inf(
"calling blocking teardown()")
525 self.
loggerlogger.prn_inf(
"teardown() finished")
530 """! Test runner for host test.
532 @details This function will start executing test and forward test result via serial port
533 to test suite. This function is sensitive to work-flow flags such as --skip-flashing,
535 First function will flash device with binary, initialize serial port for communication,
536 reset target. On serial port handshake with test case will be performed. It is when host
537 test reads property data from serial port (sent over serial port).
538 At the end of the procedure proper host test (defined in set properties) will be executed
539 and test execution timeout will be measured.
541 result = self.RESULT_UNDEF
548 if self.
optionsoptions.skip_flashing:
549 self.
loggerlogger.prn_inf(
"copy image onto target... SKIPPED!")
551 self.
loggerlogger.prn_inf(
"copy image onto target...")
552 result = self.
mbedmbed.copy_image()
554 result = self.RESULT_IOERR_COPY
558 test_result = self.
run_testrun_test()
560 if test_result ==
True:
561 result = self.RESULT_SUCCESS
562 elif test_result ==
False:
563 result = self.RESULT_FAILURE
564 elif test_result
is None:
565 result = self.RESULT_ERROR
570 self.
loggerlogger.prn_inf(
"{{result;%s}}"% result)
573 except KeyboardInterrupt:
578 Matches lines from compare log with the target serial output. Compare log lines are matched in seq using index
579 self.compare_log_idx. Lines can be strings to be matched as is or regular expressions.
588 if regex
in line
or re.search(regex, line):
Yet another logger flavour.
Test class with serial port initialization.
def get_test_result_int(self, test_result_str)
Maps test result string to unique integer.
def get_hello_string(self)
def setup(self)
Setup and check if configuration for test is correct.
Select default host_test supervision (replaced after auto detection)
def is_host_test_obj_compatible(self, obj_instance)
Check if host test object loaded is actually host test class derived from 'mbed_host_tests....
def match_log(self, line)
def __init__(self, options)
ctor
def execute(self)
Test runner for host test.
def run_test(self)
This function implements key-value protocol state-machine.