Mbed Host Tests
mbed_base.py
Go to the documentation of this file.
1 """
2 mbed SDK
3 Copyright (c) 2011-2016 ARM Limited
4 
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8 
9  http://www.apache.org/licenses/LICENSE-2.0
10 
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 
17 Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
18 """
19 
20 import json
21 import os
22 import mbed_lstools
23 from time import sleep
24 from mbed_host_tests import DEFAULT_BAUD_RATE
25 import mbed_host_tests.host_tests_plugins as ht_plugins
26 from mbed_host_tests.host_tests_logger import HtrunLogger
27 
28 
29 class Mbed:
30  """! Base class for a host driven test
31  @details This class stores information about things like disk, port, serial speed etc.
32  Class is also responsible for manipulation of serial port between host and mbed device
33  """
34  def __init__(self, options):
35  """ ctor
36  """
37  # For compatibility with old mbed. We can use command line options for Mbed object
38  # or we can pass options directly from .
39  self.optionsoptions = options
40  self.loggerlogger = HtrunLogger('MBED')
41  # Options related to copy / reset mbed device
42  self.portport = self.optionsoptions.port
43  self.diskdisk = self.optionsoptions.disk
44  self.target_idtarget_id = self.optionsoptions.target_id
45  self.image_pathimage_path = self.optionsoptions.image_path.strip('"') if self.optionsoptions.image_path is not None else ''
46  self.copy_methodcopy_method = self.optionsoptions.copy_method
47  self.retry_copyretry_copy = self.optionsoptions.retry_copy
48  self.program_cycle_sprogram_cycle_s = float(self.optionsoptions.program_cycle_s if self.optionsoptions.program_cycle_s is not None else 2.0)
49  self.polling_timeoutpolling_timeout = self.optionsoptions.polling_timeout
50 
51  # Serial port settings
52  self.serial_baudserial_baud = DEFAULT_BAUD_RATE
53  self.serial_timeoutserial_timeout = 1
54 
55  # Users can use command to pass port speeds together with port name. E.g. COM4:115200:1
56  # Format if PORT:SPEED:TIMEOUT
57  port_config = self.portport.split(':') if self.portport else ''
58  if len(port_config) == 2:
59  # -p COM4:115200
60  self.portport = port_config[0]
61  self.serial_baudserial_baud = int(port_config[1])
62  elif len(port_config) == 3:
63  # -p COM4:115200:0.5
64  self.portport = port_config[0]
65  self.serial_baudserial_baud = int(port_config[1])
66  self.serial_timeoutserial_timeout = float(port_config[2])
67 
68  # Overriding baud rate value with command line specified value
69  self.serial_baudserial_baud = self.optionsoptions.baud_rate if self.optionsoptions.baud_rate else self.serial_baudserial_baud
70 
71  # Test configuration in JSON format
72  self.test_cfgtest_cfg = None
73  if self.optionsoptions.json_test_configuration is not None:
74  # We need to normalize path before we open file
75  json_test_configuration_path = self.optionsoptions.json_test_configuration.strip("\"'")
76  try:
77  self.loggerlogger.prn_inf("Loading test configuration from '%s'..." % json_test_configuration_path)
78  with open(json_test_configuration_path) as data_file:
79  self.test_cfgtest_cfg = json.load(data_file)
80  except IOError as e:
81  self.loggerlogger.prn_err("Test configuration JSON file '{0}' I/O error({1}): {2}"
82  .format(json_test_configuration_path, e.errno, e.strerror))
83  except:
84  self.loggerlogger.prn_err("Test configuration JSON Unexpected error:", str(e))
85  raise
86 
87  def copy_image(self, image_path=None, disk=None, copy_method=None, port=None, retry_copy=5):
88  """! Closure for copy_image_raw() method.
89  @return Returns result from copy plugin
90  """
91  def get_remount_count(disk_path, tries=2):
92  """! Get the remount count from 'DETAILS.TXT' file
93  @return Returns count, None if not-available
94  """
95  for cur_try in range(1, tries + 1):
96  try:
97  files_on_disk = [x.upper() for x in os.listdir(disk_path)]
98  if 'DETAILS.TXT' in files_on_disk:
99  with open(os.path.join(disk_path, 'DETAILS.TXT'), 'r') as details_txt:
100  for line in details_txt.readlines():
101  if 'Remount count:' in line:
102  return int(line.replace('Remount count: ', ''))
103  # Remount count not found in file
104  return None
105  # 'DETAILS.TXT file not found
106  else:
107  return None
108 
109  except OSError as e:
110  self.loggerlogger.prn_err("Failed to get remount count due to OSError.", str(e))
111  self.loggerlogger.prn_inf("Retrying in 1 second (try %s of %s)" % (cur_try, tries))
112  sleep(1)
113  # Failed to get remount count
114  return None
115 
116  def check_flash_error(target_id, disk, initial_remount_count):
117  """! Check for flash errors
118  @return Returns false if FAIL.TXT present, else true
119  """
120  if not target_id:
121  self.loggerlogger.prn_wrn("Target ID not found: Skipping flash check and retry")
122  return True
123 
124  bad_files = set(['FAIL.TXT'])
125  # Re-try at max 5 times with 0.5 sec in delay
126  for i in range(5):
127  # mbed_lstools.create() should be done inside the loop. Otherwise it will loop on same data.
128  mbeds = mbed_lstools.create()
129  mbed_list = mbeds.list_mbeds() #list of mbeds present
130  # get first item in list with a matching target_id, if present
131  mbed_target = next((x for x in mbed_list if x['target_id']==target_id), None)
132 
133  if mbed_target is not None:
134  if 'mount_point' in mbed_target and mbed_target['mount_point'] is not None:
135  if not initial_remount_count is None:
136  new_remount_count = get_remount_count(disk)
137  if not new_remount_count is None and new_remount_count == initial_remount_count:
138  sleep(0.5)
139  continue
140 
141  common_items = []
142  try:
143  items = set([x.upper() for x in os.listdir(mbed_target['mount_point'])])
144  common_items = bad_files.intersection(items)
145  except OSError as e:
146  print("Failed to enumerate disk files, retrying")
147  continue
148 
149  for common_item in common_items:
150  full_path = os.path.join(mbed_target['mount_point'], common_item)
151  self.loggerlogger.prn_err("Found %s"% (full_path))
152  bad_file_contents = "[failed to read bad file]"
153  try:
154  with open(full_path, "r") as bad_file:
155  bad_file_contents = bad_file.read()
156  except IOError as error:
157  self.loggerlogger.prn_err("Error opening '%s': %s" % (full_path, error))
158 
159  self.loggerlogger.prn_err("Error file contents:\n%s" % bad_file_contents)
160  if common_items:
161  return False
162  sleep(0.5)
163  return True
164 
165  # Set-up closure environment
166  if not image_path:
167  image_path = self.image_pathimage_path
168  if not disk:
169  disk = self.diskdisk
170  if not copy_method:
171  copy_method = self.copy_methodcopy_method
172  if not port:
173  port = self.portport
174  if not retry_copy:
175  retry_copy = self.retry_copyretry_copy
176  target_id = self.target_idtarget_id
177 
178  if not image_path:
179  self.loggerlogger.prn_err("Error: image path not specified")
180  return False
181 
182  if not os.path.isfile(image_path):
183  self.loggerlogger.prn_err("Error: image file (%s) not found" % image_path)
184  return False
185 
186  for count in range(0, retry_copy):
187  initial_remount_count = get_remount_count(disk)
188  # Call proper copy method
189  result = self.copy_image_rawcopy_image_raw(image_path, disk, copy_method, port)
190  sleep(self.program_cycle_sprogram_cycle_s)
191  if not result:
192  continue
193  result = check_flash_error(target_id, disk, initial_remount_count)
194  if result:
195  break
196  return result
197 
198  def copy_image_raw(self, image_path=None, disk=None, copy_method=None, port=None):
199  """! Copy file depending on method you want to use. Handles exception
200  and return code from shell copy commands.
201  @return Returns result from copy plugin
202  @details Method which is actually copying image to mbed
203  """
204  # image_path - Where is binary with target's firmware
205 
206  # Select copy_method
207  # We override 'default' method with 'shell' method
208  copy_method = {
209  None : 'shell',
210  'default' : 'shell',
211  }.get(copy_method, copy_method)
212 
213  result = ht_plugins.call_plugin('CopyMethod',
214  copy_method,
215  image_path=image_path,
216  serial=port,
217  destination_disk=disk,
218  target_id=self.target_idtarget_id,
219  pooling_timeout=self.polling_timeoutpolling_timeout)
220  return result
221 
222  def hw_reset(self):
223  """
224  Performs hardware reset of target ned device.
225 
226  :return:
227  """
228  device_info = {}
229  result = ht_plugins.call_plugin('ResetMethod',
230  'power_cycle',
231  target_id=self.target_idtarget_id,
232  device_info=device_info)
233  if result:
234  self.portport = device_info['serial_port']
235  self.diskdisk = device_info['mount_point']
236  return result
237 
Base class for a host driven test.
Definition: mbed_base.py:29
def copy_image(self, image_path=None, disk=None, copy_method=None, port=None, retry_copy=5)
Closure for copy_image_raw() method.
Definition: mbed_base.py:87
def copy_image_raw(self, image_path=None, disk=None, copy_method=None, port=None)
Copy file depending on method you want to use.
Definition: mbed_base.py:198