[Automated-testing] [PATCH] devauto: Method Implementation for np0801dt
Edwin Plauchu
edwin.plauchu.camacho at linux.intel.com
Tue Jun 28 09:07:37 PDT 2016
From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
The remote power management system NP-0801D(T) has been
implemented by utilizing np0801dx.py which allows take over series T or B.
Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
---
devauto/rps/hw/np0801dt.py | 39 +++++++++-
devauto/rps/hw/np0801dx.py | 185 +++++++++++++++++++++++++++++++++++++++++++++
devauto/rps/hw/rpsgen.py | 14 +++-
3 files changed, 231 insertions(+), 7 deletions(-)
create mode 100644 devauto/rps/hw/np0801dx.py
diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
index b1a456d..440612e 100644
--- a/devauto/rps/hw/np0801dt.py
+++ b/devauto/rps/hw/np0801dt.py
@@ -5,9 +5,42 @@ At the time of the implementation, the reference website is:
http://www.synaccess-net.com/np-0801dt/
"""
-from rpsgen import RPSGen
+from rpsgen import RPSGen, RPSGenException
+from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
+from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot, NpCmdRebootSlot
+
class np0801dt(RPSGen):
- outlet_count=8
+
def __init__(self):
- super(outlet_count)
+ super(np0801dt, self).__init__(Np0801Dx.slot_max)
+ device_ip = '192.168.1.100' #This shall come from a config file
+ adm_auth_info = { 'username' : 'admin', 'password' : 'admin' } #This shall come from a config file
+ self.__applier = NpCmdApplier(Np0801Dx(device_ip, adm_auth_info))
+
+ def turn_all_on(self):
+ self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['ON']))
+ results = self.__applier.update()
+ self.__eval_except(results.pop())
+
+ def turn_all_off(self):
+ self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['OFF']))
+ results = self.__applier.update()
+ self.__eval_except(results.pop())
+
+ def turn_on_by_id(self, outlet_id):
+ self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['ON']))
+
+ def turn_off_by_id(self, outlet_id):
+ self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['OFF']))
+
+ def __eval_except(self, cmd_rc):
+ if cmd_rc == Np0801Dx.cmd_status['CMD_FAIL']:
+ raise RPSGenException("Failed action", cmd_rc)
+ if cmd_rc == Np0801Dx.cmd_status['HW_FAIL']:
+ raise RPSGenException("Experimenting a hw fail",cmd_rc)
+
+ def __ac_slot(self, slot_number, status):
+ self.__applier.add( NpCmdAlterSlot(slot_number, status))
+ results = self.__applier.update()
+ return results.pop()
diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
new file mode 100644
index 0000000..12bb76a
--- /dev/null
+++ b/devauto/rps/hw/np0801dx.py
@@ -0,0 +1,185 @@
+"""
+np0801d(t or b) Remote power switch control implementation
+
+At the time of the implementation, the reference website is:
+
+http://www.synaccess-net.com/np-0801dt/
+"""
+
+class Np0801Dx():
+ """Represents capabilities and features of np0801dx.
+
+ Ip address and administration credentials
+ of np-0801dx shall be required to control
+ """
+ cmd_status = {'CMD_OK': 0, 'CMD_FAIL': -1, 'HW_FAIL': -2}
+ slot_status = {'ON': 1, 'OFF': 0}
+ slot_max = 8
+
+ def __init__(self, hostname, adm_auth_info):
+ self.__host = hostname
+ self.__adm_auth_info = adm_auth_info
+
+ def do(self, data):
+ """Do a command through np-0801dx CGI-API.
+
+ It shall receive an unquote string to be pushed
+ as parameters of a HTTP-GET request
+ """
+ import urllib.request
+ import base64
+ from array import array
+
+ auth_str = ('%s:%s' % (self.__adm_auth_info['username'], self.__adm_auth_info['password']))
+ answer = "http://{0}/cmd.cgi?{1}".format(self.__host, urllib.request.pathname2url(data))
+ basic64 = b"Basic " + base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'')
+ req = urllib.request.Request(answer)
+ req.add_header("Authorization", basic64)
+ with urllib.request.urlopen(req) as response:
+ return response.read().replace(b'\r\n', b'')
+
+class NpCmdApplier:
+ """Applies commands to Np0801Dx instance.
+
+ This class makes possible the creation of
+ command sequences to be applied in a row.
+ A sequence shall be compounded by one or
+ more commands
+ """
+ def __init__(self, pms):
+ self.commands = []
+
+ if not isinstance(pms, Np0801Dx):
+ raise ValueError('Not valid Np0801Dx instance')
+
+ self.np_model = pms
+
+ def add(self, command):
+ """Add command to FIFO that runs on an update call."""
+ self.commands.append(command)
+
+ def update(self):
+ """Run the overall of commands from FIFO.
+
+ Every command shall append its return
+ upon a list of replies, such list shall
+ abide with FIFO order
+ """
+ replies = []
+ for element in self.commands:
+ replies.append(element.go(self.np_model))
+ self.commands = []
+ return replies
+
+
+class NpMasterCmd:
+ """Template pattern for required np-0801dx commands.
+
+ Any np-0801dx command implementing this
+ template shall supply an instruction code
+ with a number of seconds to concrete the command
+ """
+ def __init__(self, ic, tc):
+ self.inst_code = ic
+ self.breathe_time = tc
+ self.cmd_arg_1 = None
+ self.cmd_arg_2 = None
+
+ def go(self, pms):
+ """Event handler that reacts to an applier update."""
+ import urllib.parse
+ import urllib.error
+ import time
+ url_params = [self.inst_code]
+ args = [ d for d in (self.cmd_arg_1, self.cmd_arg_2) if d or d == 0 ]
+ for a in (args):
+ arg_str = a if isinstance(a, str) else str(a)
+ if arg_str:
+ url_params.append(" ")
+ url_params.append(arg_str)
+ data = urllib.parse.unquote(''.join(url_params))
+
+ try:
+ reply = pms.do(data)
+ time.sleep(self.breathe_time)
+ return self.parse_reply(reply)
+ except (urllib.error.URLError, urllib.error.HTTPError) as error:
+ return Np0801Dx.cmd_status['HW_FAIL']
+
+ def parse_reply(self, reply):
+ """To be implemented as reply parser of any command."""
+ pass
+
+
+class NpCmdAlterSlot(NpMasterCmd):
+ """Implementation of turn off/on over specific slot."""
+ def __init__(self, slot, st):
+
+ super(NpCmdAlterSlot, self).__init__('$A3', 3)
+
+ if not isinstance(slot, int):
+ raise ValueError('Not valid socket selected')
+
+ if not isinstance(st, int):
+ raise ValueError('Not valid state selected')
+
+ self.cmd_arg_1 = slot
+ self.cmd_arg_2 = st
+
+ def parse_reply(self, reply):
+ """Return CMD_OK when success otherwise CMD_FAIL."""
+ if reply == b'$A0':
+ return Np0801Dx.cmd_status['CMD_OK']
+ return Np0801Dx.cmd_status['CMD_FAIL']
+
+class NpCmdRebootSlot(NpMasterCmd):
+ """Implementation of reboot over specific slot."""
+ def __init__(self, slot):
+
+ super(NpCmdRebootSlot, self).__init__('$A4', 4)
+
+ if not isinstance(slot, int):
+ raise ValueError('Not valid socket selected')
+
+ self.cmd_arg_1 = slot
+
+ def parse_reply(self, reply):
+ """Return CMD_OK when success otherwise CMD_FAIL."""
+ if reply == b'$A0':
+ return Np0801Dx.cmd_status['CMD_OK']
+ return Np0801Dx.cmd_status['CMD_FAIL']
+
+class NpCmdMonitor(NpMasterCmd):
+ """Implementation of getting info to monitor."""
+ def __init__(self):
+
+ super(NpCmdMonitor, self).__init__('$A5', 2)
+
+ def parse_reply(self, reply):
+ """Return status info when measures otherwise None."""
+ rv = {'ss':None, 'amps':None, 'temp':None}
+ vals = (reply).decode('utf-8').split(',')
+
+ if vals[0] == '$A0':
+ if len(vals) == 4:
+ rv['amps'] = vals[2]
+ rv['temp'] = vals[3]
+ rv['ss'] = vals[1]
+ return rv
+
+class NpCmdAlterAllSlots(NpMasterCmd):
+ """Implementation of turn off/on on all the slots."""
+ def __init__(self, st):
+
+ super(NpCmdAlterAllSlots, self).__init__('$A7', 7)
+
+ if not isinstance(st, int):
+ raise ValueError('Not valid state selected')
+
+ self.cmd_arg_1 = st
+
+ def parse_reply(self, reply):
+ """Return CMD_OK when success otherwise CMD_FAIL."""
+ if reply == b'$A0':
+ return Np0801Dx.cmd_status['CMD_OK']
+ return Np0801Dx.cmd_status['CMD_FAIL']
diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
index dea0119..53d0639 100644
--- a/devauto/rps/hw/rpsgen.py
+++ b/devauto/rps/hw/rpsgen.py
@@ -13,11 +13,17 @@ class RPSGen:
def __init__(self, outlet_count):
self.outlet_count = outlet_count
- def turn_all_on():
+ def turn_all_on(self):
pass
- def turn_all_off():
+ def turn_all_off(self):
pass
- def turn_on_by_id(outlet_id):
+ def turn_on_by_id(self,outlet_id):
pass
- def turn_off_by_id(outlet_id):
+ def turn_off_by_id(self,outlet_id):
pass
+
+class RPSGenException(Exception):
+ """Remote power switch base exception."""
+ def __init__(self, msg, edata):
+ super(RPSGenException, self).__init__(msg)
+ self.edata = edata
--
1.9.1
More information about the automated-testing
mailing list