[Automated-testing] [PATCH v2] devauto: Method Implementation for np0801dt

Edwin Plauchu edwin.plauchu.camacho at linux.intel.com
Tue Jun 28 09:10:33 PDT 2016


From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>

The remote power management system NP-0801D(T) has been
implemented by utilizing np0801dx.py which allows take over series T or B.

Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
---
 devauto/rps/hw/np0801dt.py |  39 +++++++++-
 devauto/rps/hw/np0801dx.py | 185 +++++++++++++++++++++++++++++++++++++++++++++
 devauto/rps/hw/rpsgen.py   |  14 +++-
 3 files changed, 231 insertions(+), 7 deletions(-)
 create mode 100644 devauto/rps/hw/np0801dx.py

diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
index b1a456d..440612e 100644
--- a/devauto/rps/hw/np0801dt.py
+++ b/devauto/rps/hw/np0801dt.py
@@ -5,9 +5,42 @@ At the time of the implementation, the reference website is:
 
 http://www.synaccess-net.com/np-0801dt/
 """
-from rpsgen import RPSGen
+from rpsgen import RPSGen, RPSGenException
+from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
+from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot, NpCmdRebootSlot
+
 
 class np0801dt(RPSGen):
-    outlet_count=8
+
     def __init__(self):
-        super(outlet_count)
+        super(np0801dt, self).__init__(Np0801Dx.slot_max)
+        device_ip = '192.168.1.100' #This shall come from a config file
+        adm_auth_info = { 'username' : 'admin', 'password' : 'admin' } #This shall come from a config file
+        self.__applier = NpCmdApplier(Np0801Dx(device_ip, adm_auth_info))
+
+    def turn_all_on(self):
+        self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['ON']))
+        results = self.__applier.update()
+        self.__eval_except(results.pop())
+
+    def turn_all_off(self):
+        self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['OFF']))
+        results = self.__applier.update()
+        self.__eval_except(results.pop())
+
+    def turn_on_by_id(self, outlet_id):
+        self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['ON']))
+
+    def turn_off_by_id(self, outlet_id):
+        self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['OFF']))
+
+    def __eval_except(self, cmd_rc):
+        if cmd_rc == Np0801Dx.cmd_status['CMD_FAIL']:
+            raise RPSGenException("Failed action", cmd_rc)
+        if cmd_rc == Np0801Dx.cmd_status['HW_FAIL']:
+            raise RPSGenException("Experimenting a hw fail",cmd_rc)
+
+    def __ac_slot(self, slot_number, status):
+        self.__applier.add( NpCmdAlterSlot(slot_number, status))
+        results = self.__applier.update()
+        return results.pop()
diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
new file mode 100644
index 0000000..12bb76a
--- /dev/null
+++ b/devauto/rps/hw/np0801dx.py
@@ -0,0 +1,185 @@
+"""
+np0801d(t or b) Remote power switch control implementation
+
+At the time of the implementation, the reference website is:
+
+http://www.synaccess-net.com/np-0801dt/
+"""
+
+class Np0801Dx():
+    """Represents capabilities and features of np0801dx.
+
+    Ip address and administration credentials
+    of np-0801dx shall be required to control
+    """
+    cmd_status = {'CMD_OK': 0, 'CMD_FAIL': -1, 'HW_FAIL': -2}
+    slot_status = {'ON': 1, 'OFF': 0}
+    slot_max = 8
+
+    def __init__(self, hostname, adm_auth_info):
+        self.__host = hostname
+        self.__adm_auth_info = adm_auth_info
+
+    def do(self, data):
+        """Do a command through np-0801dx CGI-API.
+
+        It shall receive an unquote string to be pushed
+        as parameters of a HTTP-GET request
+        """
+        import urllib.request
+        import base64
+        from array import array
+
+        auth_str = ('%s:%s' % (self.__adm_auth_info['username'], self.__adm_auth_info['password']))
+        answer = "http://{0}/cmd.cgi?{1}".format(self.__host, urllib.request.pathname2url(data))
+        basic64 = b"Basic " + base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'') 
+        req = urllib.request.Request(answer)
+        req.add_header("Authorization", basic64)
+        with urllib.request.urlopen(req) as response:
+            return response.read().replace(b'\r\n', b'')
+
+class NpCmdApplier:
+    """Applies commands to Np0801Dx instance.
+
+    This class makes possible the creation of
+    command sequences to be applied in a row.
+    A sequence shall be compounded by one or
+    more commands
+    """
+    def __init__(self, pms):
+        self.commands = []
+
+        if not isinstance(pms, Np0801Dx):
+            raise ValueError('Not valid Np0801Dx instance')
+
+        self.np_model = pms
+
+    def add(self, command):
+        """Add command to FIFO that runs on an update call."""
+        self.commands.append(command)
+
+    def update(self):
+        """Run the overall of commands from FIFO.
+
+        Every command shall append its return
+        upon a list of replies, such list shall
+        abide with FIFO order
+        """
+        replies = []
+        for element in self.commands:
+            replies.append(element.go(self.np_model))
+        self.commands = []
+        return replies
+
+
+class NpMasterCmd:
+    """Template pattern for required np-0801dx commands.
+
+    Any np-0801dx command implementing this
+    template shall supply an instruction code
+    with a number of seconds to concrete the command
+    """
+    def __init__(self, ic, tc):
+        self.inst_code = ic
+        self.breathe_time = tc
+        self.cmd_arg_1 = None
+        self.cmd_arg_2 = None
+
+    def go(self, pms):
+        """Event handler that reacts to an applier update."""
+        import urllib.parse
+        import urllib.error
+        import time
+        url_params = [self.inst_code]
+        args = [ d for d in (self.cmd_arg_1, self.cmd_arg_2) if d or d == 0 ]
+        for a in (args):
+            arg_str = a if isinstance(a, str) else str(a)
+            if arg_str:
+                url_params.append(" ")
+                url_params.append(arg_str)
+        data = urllib.parse.unquote(''.join(url_params))
+
+        try:
+            reply = pms.do(data)
+            time.sleep(self.breathe_time)
+            return self.parse_reply(reply)
+        except (urllib.error.URLError, urllib.error.HTTPError) as error:
+            return Np0801Dx.cmd_status['HW_FAIL']
+
+    def parse_reply(self, reply):
+        """To be implemented as reply parser of any command."""
+        pass
+
+
+class NpCmdAlterSlot(NpMasterCmd):
+    """Implementation of turn off/on over specific slot."""
+    def __init__(self, slot, st):
+
+        super(NpCmdAlterSlot, self).__init__('$A3', 3)
+
+        if not isinstance(slot, int):
+            raise ValueError('Not valid socket selected')
+
+        if not isinstance(st, int):
+            raise ValueError('Not valid state selected')
+
+        self.cmd_arg_1 = slot
+        self.cmd_arg_2 = st
+
+    def parse_reply(self, reply):
+        """Return CMD_OK when success otherwise CMD_FAIL."""
+        if reply == b'$A0':
+            return Np0801Dx.cmd_status['CMD_OK']
+        return Np0801Dx.cmd_status['CMD_FAIL']
+
+class NpCmdRebootSlot(NpMasterCmd):
+    """Implementation of reboot over specific slot."""
+    def __init__(self, slot):
+
+        super(NpCmdRebootSlot, self).__init__('$A4', 4)
+
+        if not isinstance(slot, int):
+            raise ValueError('Not valid socket selected')
+
+        self.cmd_arg_1 = slot
+
+    def parse_reply(self, reply):
+        """Return CMD_OK when success otherwise CMD_FAIL."""
+        if reply == b'$A0':
+            return Np0801Dx.cmd_status['CMD_OK']
+        return Np0801Dx.cmd_status['CMD_FAIL']
+
+class NpCmdMonitor(NpMasterCmd):
+    """Implementation of getting info to monitor."""
+    def __init__(self):
+
+        super(NpCmdMonitor, self).__init__('$A5', 2)
+
+    def parse_reply(self, reply):
+        """Return status info when measures otherwise None."""
+        rv = {'ss':None, 'amps':None, 'temp':None} 
+        vals = (reply).decode('utf-8').split(',')
+
+        if vals[0] == '$A0':
+            if len(vals) == 4:
+                rv['amps'] = vals[2]
+                rv['temp'] = vals[3]
+                rv['ss'] = vals[1]
+        return rv
+
+class NpCmdAlterAllSlots(NpMasterCmd):
+    """Implementation of turn off/on on all the slots."""
+    def __init__(self, st):
+
+        super(NpCmdAlterAllSlots, self).__init__('$A7', 7)
+
+        if not isinstance(st, int):
+            raise ValueError('Not valid state selected')
+
+        self.cmd_arg_1 = st
+
+    def parse_reply(self, reply):
+        """Return CMD_OK when success otherwise CMD_FAIL."""
+        if reply == b'$A0':
+            return Np0801Dx.cmd_status['CMD_OK']
+        return Np0801Dx.cmd_status['CMD_FAIL']
diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
index dea0119..53d0639 100644
--- a/devauto/rps/hw/rpsgen.py
+++ b/devauto/rps/hw/rpsgen.py
@@ -13,11 +13,17 @@ class RPSGen:
     def __init__(self, outlet_count):
         self.outlet_count = outlet_count
 
-    def turn_all_on():
+    def turn_all_on(self):
         pass
-    def turn_all_off():
+    def turn_all_off(self):
         pass
-    def turn_on_by_id(outlet_id):
+    def turn_on_by_id(self,outlet_id):
         pass
-    def turn_off_by_id(outlet_id):
+    def turn_off_by_id(self,outlet_id):
         pass
+
+class RPSGenException(Exception):
+    """Remote power switch base exception."""
+    def __init__(self, msg, edata):
+        super(RPSGenException, self).__init__(msg)
+        self.edata = edata
-- 
1.9.1



More information about the automated-testing mailing list