Mbed Host Tests
conn_primitive_remote.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 mbed SDK
4 Copyright (c) 2011-2016 ARM Limited
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 """
18 
19 from mbed_host_tests import DEFAULT_BAUD_RATE
20 from mbed_host_tests.host_tests_conn_proxy.conn_primitive import ConnectorPrimitive
21 
22 
24  def __init__(self, name, config, importer=__import__):
25  ConnectorPrimitive.__init__(self, name)
26  self.configconfig = config
27  self.target_idtarget_id = self.configconfig.get('target_id', None)
28  self.grm_hostgrm_host = config.get('grm_host', None)
29  self.grm_portgrm_port = int(config.get('grm_port', 8000))
30  self.grm_modulegrm_module = config.get('grm_module', 'unknown')
31  self.platform_nameplatform_name = config.get('platform_name', None)
32  self.baudratebaudrate = config.get('baudrate', DEFAULT_BAUD_RATE)
33  self.image_pathimage_path = config.get('image_path', None)
34  self.allocate_requirementsallocate_requirements = {"platform_name": self.platform_nameplatform_name}
35 
36  if self.configconfig.get("tags"):
37  self.allocate_requirementsallocate_requirements["tags"] = {}
38  for tag in config["tags"].split(','):
39  self.allocate_requirementsallocate_requirements["tags"][tag] = True
40 
41  # Global Resource Mgr tool-kit
42  self.remote_moduleremote_module = None
43  self.selected_resourceselected_resource = None
44  self.clientclient = None
45 
46  # Initialize remote resource manager
47  self.__remote_init__remote_init(importer)
48 
49  def __remote_init(self, importer):
50  """! Initialize DUT using GRM APIs """
51 
52  # We want to load global resource manager module by name from command line (switch --grm)
53  try:
54  self.remote_moduleremote_module = importer(self.grm_modulegrm_module)
55  except ImportError as error:
56  self.loggerlogger.prn_err("unable to load global resource manager '%s' module!" % self.grm_modulegrm_module)
57  self.loggerlogger.prn_err(str(error))
58  self.remote_moduleremote_module = None
59  return False
60 
61  self.loggerlogger.prn_inf("remote resources initialization: remote(host=%s, port=%s)" %
62  (self.grm_hostgrm_host, self.grm_portgrm_port))
63 
64  # Connect to remote global resource manager
65  self.clientclient = self.remote_moduleremote_module.create(host=self.grm_hostgrm_host, port=self.grm_portgrm_port)
66 
67  # First get the resources
68  resources = self.clientclient.get_resources()
69  self.loggerlogger.prn_inf("remote resources count: %d" % len(resources))
70 
71  # Query for available resource
72  # Automatic selection and allocation of a resource
73  try:
74  self.selected_resourceselected_resource = self.clientclient.allocate(self.allocate_requirementsallocate_requirements)
75  except Exception as error:
76  self.loggerlogger.prn_err("can't allocate resource: '%s', reason: %s" % (self.platform_nameplatform_name, str(error)))
77  return False
78 
79  # Remote DUT connection, flashing and reset...
80  try:
81  self.__remote_flashing__remote_flashing(self.image_pathimage_path, forceflash=True)
82  self.__remote_connect__remote_connect(baudrate=self.baudratebaudrate)
83  self.__remote_reset__remote_reset()
84  except Exception as error:
85  self.loggerlogger.prn_err(str(error))
86  self.__remote_release__remote_release()
87  return False
88  return True
89 
90  def __remote_connect(self, baudrate=DEFAULT_BAUD_RATE):
91  """! Open remote connection to DUT """
92  self.loggerlogger.prn_inf("opening connection to platform at baudrate='%s'" % baudrate)
93  if not self.selected_resourceselected_resource:
94  raise Exception("remote resource not exists!")
95  try:
96  serial_parameters = self.remote_moduleremote_module.SerialParameters(baudrate=baudrate)
97  self.selected_resourceselected_resource.open_connection(parameters=serial_parameters)
98  except Exception:
99  self.loggerlogger.prn_inf("open_connection() failed")
100  raise
101 
102  def __remote_disconnect(self):
103  if not self.selected_resourceselected_resource:
104  raise Exception("remote resource not exists!")
105  try:
106  if self.connectedconnectedconnected():
107  self.selected_resourceselected_resource.close_connection()
108  except Exception as error:
109  self.loggerlogger.prn_err("RemoteConnectorPrimitive.disconnect() failed, reason: " + str(error))
110 
111  def __remote_reset(self):
112  """! Use GRM remote API to reset DUT """
113  self.loggerlogger.prn_inf("remote resources reset...")
114  if not self.selected_resourceselected_resource:
115  raise Exception("remote resource not exists!")
116  try:
117  if self.selected_resourceselected_resource.reset() is False:
118  raise Exception("remote resources reset failed!")
119  except Exception:
120  self.loggerlogger.prn_inf("reset() failed")
121  raise
122 
123  def __remote_flashing(self, filename, forceflash=False):
124  """! Use GRM remote API to flash DUT """
125  self.loggerlogger.prn_inf("remote resources flashing with '%s'..." % filename)
126  if not self.selected_resourceselected_resource:
127  raise Exception("remote resource not exists!")
128  try:
129  if self.selected_resourceselected_resource.flash(filename, forceflash=forceflash) is False:
130  raise Exception("remote resource flashing failed!")
131  except Exception:
132  self.loggerlogger.prn_inf("flash() failed")
133  raise
134 
135  def read(self, count):
136  """! Read 'count' bytes of data from DUT """
137  if not self.connectedconnectedconnected():
138  raise Exception("remote resource not exists!")
139  data = str()
140  try:
141  data = self.selected_resourceselected_resource.read(count)
142  except Exception as error:
143  self.loggerlogger.prn_err("RemoteConnectorPrimitive.read(%d): %s" % (count, str(error)))
144  return data
145 
146  def write(self, payload, log=False):
147  """! Write 'payload' to DUT """
148  if self.connectedconnectedconnected():
149  try:
150  self.selected_resourceselected_resource.write(payload)
151  if log:
152  self.loggerlogger.prn_txd(payload)
153  return True
154  except Exception as error:
155  self.LAST_ERRORLAST_ERRORLAST_ERROR = "remote write error: %s" % str(error)
156  self.loggerlogger.prn_err(str(error))
157  return False
158 
159  def flush(self):
160  pass
161 
162  def allocated(self):
163  return self.remote_module and self.selected_resource and self.selected_resource.is_allocated
164 
165  def connected(self):
166  return self.allocatedallocated() and self.selected_resourceselected_resource.is_connected
167 
168  def __remote_release(self):
169  try:
170  if self.allocatedallocated():
171  self.selected_resourceselected_resource.release()
172  self.selected_resourceselected_resource = None
173  except Exception as error:
174  self.loggerlogger.prn_err("RemoteConnectorPrimitive.release failed, reason: " + str(error))
175 
176  def finish(self):
177  # Finally once we're done with the resource
178  # we disconnect and release the allocation
179  if self.allocatedallocated():
180  self.__remote_disconnect__remote_disconnect()
181  self.__remote_release__remote_release()
182 
183  def reset(self):
184  self.__remote_reset__remote_reset()
185 
186  def __del__(self):
187  self.finishfinishfinish()
def finish(self)
Handle DUT dtor like (close resource) operations here.
def connected(self)
Check if there is a connection to DUT.
def finish(self)
Handle DUT dtor like (close resource) operations here.