[Automated-testing] [PATCH v3] devauto: Method Implementation for np0801dt
Edwin Plauchu
edwin.plauchu.camacho at linux.intel.com
Thu Jun 30 13:00:43 PDT 2016
From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
The remote power management system NP-0801D(T) has been implemented.
Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
---
devauto/rps/hw/np0801dt.py | 105 +++++++++++++++++++++++++++++++++++++++++++--
devauto/rps/hw/rpsgen.py | 20 +++++++--
2 files changed, 117 insertions(+), 8 deletions(-)
diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
index b1a456d..fec8752 100644
--- a/devauto/rps/hw/np0801dt.py
+++ b/devauto/rps/hw/np0801dt.py
@@ -5,9 +5,106 @@ At the time of the implementation, the reference website is:
http://www.synaccess-net.com/np-0801dt/
"""
-from rpsgen import RPSGen
+from rpsgen import RPSGen, RPSGenHwException, RPSGenFailException
class np0801dt(RPSGen):
- outlet_count=8
- def __init__(self):
- super(outlet_count)
+
+ __SLOT_STATUS = {'ON':1, 'OFF':0}
+ __SLOT_MAX = 8
+ __DEFAULT_SERVER_IP = '192.168.1.100'
+ __DEFAULT_ADMIN = 'admin'
+ __DEFAULT_PASSWORD = 'admin'
+ __FAIL_CODE = b'$AF'
+
+ __ACTIONS = {
+ 'AlterAllSlots':{'inst_code':'$A7', 'breathe_time':6},
+ 'AlterSlot':{'inst_code':'$A3', 'breathe_time':2},
+ }
+
+ def __init__(self, *args, **kwargs):
+ super(np0801dt, self).__init__(self.__SLOT_MAX)
+ self.__device_ip = kwargs.get('device_ip', self.__DEFAULT_SERVER_IP)
+ self.__adm_auth_info = {
+ 'username':kwargs.get('username', self.__DEFAULT_ADMIN),
+ 'password':kwargs.get('password', self.__DEFAULT_PASSWORD),
+ }
+
+ def __exec_inst(self, inst_code, breathe_time, cmd_arg_1, cmd_arg_2):
+ """Execute instruction over switch."""
+ import urllib.parse
+ import urllib.error
+ import time
+ url_params = [inst_code]
+ args = [ d for d in (cmd_arg_1, cmd_arg_2) if d or d == 0 ]
+ for a in (args):
+ arg_str = a if isinstance(a, str) else str(a)
+ if arg_str:
+ url_params.append(" ")
+ url_params.append(arg_str)
+ try:
+ reply = self.__inject(urllib.parse.unquote(''.join(url_params)))
+ time.sleep(breathe_time)
+ except (urllib.error.URLError, urllib.error.HTTPError) as error:
+ raise RPSGenHwException("Problems with np-0801dt CGI-API", error)
+
+ def __inject(self, data):
+ """Injects an instruction in np-0801dx CGI-API.
+
+ It shall receive an unquote string to be pushed as parameters of a
+ HTTP-GET request
+ """
+ import urllib.request
+ import base64
+ from array import array
+
+ auth_str = "{0}:{1}".format(self.__adm_auth_info['username'], self.__adm_auth_info['password'])
+ answer = "http://{0}/cmd.cgi?{1}".format(self.__device_ip, urllib.request.pathname2url(data))
+ basic64 = b"Basic " + base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'')
+ req = urllib.request.Request(answer)
+ req.add_header("Authorization", basic64)
+ with urllib.request.urlopen(req) as response:
+ return response.read().replace(b'\r\n', b'')
+
+ def turn_all_on(self):
+ action = self.__ACTIONS['AlterAllSlots']
+ try:
+ reply = self.__exec_inst(
+ action['inst_code'], action['breathe_time'],
+ self.__SLOT_STATUS['ON'], None)
+ if reply == self.__FAIL_CODE:
+ raise RPSGenFailException("RPS failed when turn all outlets on",None)
+ except (RPSGenHwException, RPSGenFailException) as e:
+ raise
+
+ def turn_all_off(self):
+ action = self.__ACTIONS['AlterAllSlots']
+ try:
+ reply = self.__exec_inst(
+ action['inst_code'], action['breathe_time'],
+ self.__SLOT_STATUS['OFF'], None)
+ if reply == self.__FAIL_CODE:
+ raise RPSGenFailException("RPS failed when turn all outlets off",None)
+ except (RPSGenHwException, RPSGenFailException) as e:
+ raise
+
+ def turn_on_by_id(self, outlet_id):
+ action = self.__ACTIONS['AlterSlot']
+ try:
+ reply = self.__exec_inst(
+ action['inst_code'], action['breathe_time'],
+ outlet_id, self.__SLOT_STATUS['ON'])
+ if reply == self.__FAIL_CODE:
+ raise RPSGenFailException("RPS failed when turn outlet on",outlet_id)
+ except (RPSGenHwException, RPSGenFailException) as e:
+ raise
+
+ def turn_off_by_id(self, outlet_id):
+ action = self.__ACTIONS['AlterSlot']
+ try:
+ reply = self.__exec_inst(
+ action['inst_code'], action['breathe_time'],
+ outlet_id, self.__SLOT_STATUS['OFF'])
+ if reply == self.__FAIL_CODE:
+ raise RPSGenFailException("RPS failed when turn outlet off",outlet_id)
+ except (RPSGenHwException, RPSGenFailException) as e:
+ raise
diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
index dea0119..8eaee0f 100644
--- a/devauto/rps/hw/rpsgen.py
+++ b/devauto/rps/hw/rpsgen.py
@@ -13,11 +13,23 @@ class RPSGen:
def __init__(self, outlet_count):
self.outlet_count = outlet_count
- def turn_all_on():
+ def turn_all_on(self):
pass
- def turn_all_off():
+ def turn_all_off(self):
pass
- def turn_on_by_id(outlet_id):
+ def turn_on_by_id(self,outlet_id):
pass
- def turn_off_by_id(outlet_id):
+ def turn_off_by_id(self,outlet_id):
pass
+
+class RPSGenHwException(Exception):
+ """Remote power switch base exception for hw."""
+ def __init__(self, msg, edata):
+ super(RPSGenHwException, self).__init__(msg)
+ self.edata = edata
+
+class RPSGenFailException(Exception):
+ """Remote power switch base exception for fails."""
+ def __init__(self, msg, edata):
+ super(RPSGenFailException, self).__init__(msg)
+ self.edata = edata
--
1.9.1
More information about the automated-testing
mailing list