Mbed Host Tests
conn_primitive_serial.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 mbed SDK
4 Copyright (c) 2011-2016 ARM Limited
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 """
18 
19 
20 import time
21 from serial import Serial, SerialException
22 from mbed_host_tests import host_tests_plugins
23 from mbed_host_tests.host_tests_plugins.host_test_plugins import HostTestPluginBase
24 from .conn_primitive import ConnectorPrimitive, ConnectorPrimitiveException
25 
26 
28  def __init__(self, name, port, baudrate, config):
29  ConnectorPrimitive.__init__(self, name)
30  self.portport = port
31  self.baudratebaudrate = int(baudrate)
32  self.read_timeoutread_timeout = 0.01 # 10 milli sec
33  self.write_timeoutwrite_timeout = 5
34  self.configconfig = config
35  self.target_idtarget_id = self.configconfig.get('target_id', None)
36  self.polling_timeoutpolling_timeoutpolling_timeout = config.get('polling_timeout', 60)
37  self.forced_reset_timeoutforced_reset_timeout = config.get('forced_reset_timeout', 1)
38  self.skip_resetskip_reset = config.get('skip_reset', False)
39  self.serialserial = None
40 
41  # Check if serial port for given target_id changed
42  # If it does we will use new port to open connections and make sure reset plugin
43  # later can reuse opened already serial port
44  #
45  # Note: This listener opens serial port and keeps connection so reset plugin uses
46  # serial port object not serial port name!
47  serial_port = HostTestPluginBase().check_serial_port_ready(self.portport, target_id=self.target_idtarget_id, timeout=self.polling_timeoutpolling_timeoutpolling_timeout)
48  if serial_port is None:
49  raise ConnectorPrimitiveException("Serial port not ready!")
50 
51  if serial_port != self.portport:
52  # Serial port changed for given targetID
53  self.loggerlogger.prn_inf("serial port changed from '%s to '%s')"% (self.portport, serial_port))
54  self.portport = serial_port
55 
56  startTime = time.time()
57  self.loggerlogger.prn_inf("serial(port=%s, baudrate=%d, read_timeout=%s, write_timeout=%d)"% (self.portport, self.baudratebaudrate, self.read_timeoutread_timeout, self.write_timeoutwrite_timeout))
58  while time.time() - startTime < self.polling_timeoutpolling_timeoutpolling_timeout:
59  try:
60  # TIMEOUT: While creating Serial object timeout is delibrately passed as 0. Because blocking in Serial.read
61  # impacts thread and mutliprocess functioning in Python. Hence, instead in self.read() s delay (sleep()) is
62  # inserted to let serial buffer collect data and avoid spinning on non blocking read().
63  self.serialserial = Serial(self.portport, baudrate=self.baudratebaudrate, timeout=0, write_timeout=self.write_timeoutwrite_timeout)
64  except SerialException as e:
65  self.serialserial = None
66  self.LAST_ERRORLAST_ERRORLAST_ERROR = "connection lost, serial.Serial(%s, %d, %d, %d): %s"% (self.portport,
67  self.baudratebaudrate,
68  self.read_timeoutread_timeout,
69  self.write_timeoutwrite_timeout,
70  str(e))
71  self.loggerlogger.prn_err(str(e))
72  self.loggerlogger.prn_err("Retry after 1 sec until %s seconds" % self.polling_timeoutpolling_timeoutpolling_timeout)
73  else:
74  if not self.skip_resetskip_reset:
75  self.reset_dev_via_serialreset_dev_via_serial(delay=self.forced_reset_timeoutforced_reset_timeout)
76  break
77  time.sleep(1)
78 
79  def reset_dev_via_serial(self, delay=1):
80  """! Reset device using selected method, calls one of the reset plugins """
81  reset_type = self.configconfig.get('reset_type', 'default')
82  if not reset_type:
83  reset_type = 'default'
84  disk = self.configconfig.get('disk', None)
85 
86  self.loggerlogger.prn_inf("reset device using '%s' plugin..."% reset_type)
87  result = host_tests_plugins.call_plugin('ResetMethod',
88  reset_type,
89  serial=self.serialserial,
90  disk=disk,
91  target_id=self.target_idtarget_id,
92  polling_timeout=self.configconfig.get('polling_timeout'))
93  # Post-reset sleep
94  if delay:
95  self.loggerlogger.prn_inf("waiting %.2f sec after reset"% delay)
96  time.sleep(delay)
97  self.loggerlogger.prn_inf("wait for it...")
98  return result
99 
100  def read(self, count):
101  """! Read data from serial port RX buffer """
102  # TIMEOUT: Since read is called in a loop, wait for self.timeout period before calling serial.read(). See
103  # comment on serial.Serial() call above about timeout.
104  time.sleep(self.read_timeoutread_timeout)
105  c = str()
106  try:
107  if self.serialserial:
108  c = self.serialserial.read(count)
109  except SerialException as e:
110  self.serialserial = None
111  self.LAST_ERRORLAST_ERRORLAST_ERROR = "connection lost, serial.read(%d): %s"% (count, str(e))
112  self.loggerlogger.prn_err(str(e))
113  return c
114 
115  def write(self, payload, log=False):
116  """! Write data to serial port TX buffer """
117  try:
118  if self.serialserial:
119  self.serialserial.write(payload.encode('utf-8'))
120  if log:
121  self.loggerlogger.prn_txd(payload)
122  return True
123  except SerialException as e:
124  self.serialserial = None
125  self.LAST_ERRORLAST_ERRORLAST_ERROR = "connection lost, serial.write(%d bytes): %s"% (len(payload), str(e))
126  self.loggerlogger.prn_err(str(e))
127  return False
128 
129  def flush(self):
130  if self.serialserial:
131  self.serialserial.flush()
132 
133  def connected(self):
134  return bool(self.serialserial)
135 
136  def finish(self):
137  if self.serialserial:
138  self.serialserial.close()
139 
140  def reset(self):
141  self.reset_dev_via_serialreset_dev_via_serial(self.forced_reset_timeoutforced_reset_timeout)
142 
143  def __del__(self):
144  self.finishfinishfinish()
def finish(self)
Handle DUT dtor like (close resource) operations here.
def reset_dev_via_serial(self, delay=1)
Reset device using selected method, calls one of the reset plugins.
def finish(self)
Handle DUT dtor like (close resource) operations here.
def write(self, payload, log=False)
Write data to serial port TX buffer.