[Automated-testing] [PATCH v2] devauto: Method Implementation for np0801dt
Aníbal Limón
anibal.limon at linux.intel.com
Tue Jun 28 13:29:12 PDT 2016
On 06/28/2016 11:10 AM, Edwin Plauchu wrote:
> From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
>
> The remote power management system NP-0801D(T) has been
> implemented by utilizing np0801dx.py which allows take over series T or B.
>
> Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> ---
> devauto/rps/hw/np0801dt.py | 39 +++++++++-
> devauto/rps/hw/np0801dx.py | 185 +++++++++++++++++++++++++++++++++++++++++++++
> devauto/rps/hw/rpsgen.py | 14 +++-
> 3 files changed, 231 insertions(+), 7 deletions(-)
> create mode 100644 devauto/rps/hw/np0801dx.py
>
> diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
> index b1a456d..440612e 100644
> --- a/devauto/rps/hw/np0801dt.py
> +++ b/devauto/rps/hw/np0801dt.py
> @@ -5,9 +5,42 @@ At the time of the implementation, the reference website is:
>
> http://www.synaccess-net.com/np-0801dt/
> """
> -from rpsgen import RPSGen
> +from rpsgen import RPSGen, RPSGenException
> +from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
> +from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot, NpCmdRebootSlot
> +
>
> class np0801dt(RPSGen):
> - outlet_count=8
> +
> def __init__(self):
> - super(outlet_count)
> + super(np0801dt, self).__init__(Np0801Dx.slot_max)
> + device_ip = '192.168.1.100' #This shall come from a config file
> + adm_auth_info = { 'username' : 'admin', 'password' : 'admin' } #This shall come from a config file
For handle options of this module you could add *args and **kwargs in
__init__(self, *args, **kwargs) and the search for specific parameters
such as 'device_ip', 'username' and 'password' into kwargs if not found
use a default one defining a constant at class level i.e.
DEFAULT_SERVER_IP = '192.168.1.100'
> + self.__applier = NpCmdApplier(Np0801Dx(device_ip, adm_auth_info))
> +
> + def turn_all_on(self):
> + self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['ON']))
> + results = self.__applier.update()
> + self.__eval_except(results.pop())
> +
> + def turn_all_off(self):
> + self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['OFF']))
> + results = self.__applier.update()
> + self.__eval_except(results.pop())
> +
> + def turn_on_by_id(self, outlet_id):
> + self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['ON']))
> +
> + def turn_off_by_id(self, outlet_id):
> + self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['OFF']))
> +
> + def __eval_except(self, cmd_rc):
> + if cmd_rc == Np0801Dx.cmd_status['CMD_FAIL']:
> + raise RPSGenException("Failed action", cmd_rc)
> + if cmd_rc == Np0801Dx.cmd_status['HW_FAIL']:
> + raise RPSGenException("Experimenting a hw fail",cmd_rc)
> +
> + def __ac_slot(self, slot_number, status):
> + self.__applier.add( NpCmdAlterSlot(slot_number, status))
> + results = self.__applier.update()
> + return results.pop()
> diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
> new file mode 100644
> index 0000000..12bb76a
> --- /dev/null
> +++ b/devauto/rps/hw/np0801dx.py
> @@ -0,0 +1,185 @@
> +"""
> +np0801d(t or b) Remote power switch control implementation
> +
> +At the time of the implementation, the reference website is:
> +
> +http://www.synaccess-net.com/np-0801dt/
> +"""
> +
> +class Np0801Dx():
> + """Represents capabilities and features of np0801dx.
> +
> + Ip address and administration credentials
> + of np-0801dx shall be required to control
> + """
> + cmd_status = {'CMD_OK': 0, 'CMD_FAIL': -1, 'HW_FAIL': -2}
The returns needs to be standardized across modules so this definitions
needs to be difined at level of the base class RPSControl.
> + slot_status = {'ON': 1, 'OFF': 0}
> + slot_max = 8
> +
> + def __init__(self, hostname, adm_auth_info):
> + self.__host = hostname
> + self.__adm_auth_info = adm_auth_info
> +
> + def do(self, data):
> + """Do a command through np-0801dx CGI-API.
> +
> + It shall receive an unquote string to be pushed
> + as parameters of a HTTP-GET request
> + """
> + import urllib.request
> + import base64
> + from array import array
> +
> + auth_str = ('%s:%s' % (self.__adm_auth_info['username'], self.__adm_auth_info['password']))
> + answer = "http://{0}/cmd.cgi?{1}".format(self.__host, urllib.request.pathname2url(data))
> + basic64 = b"Basic " + base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'')
> + req = urllib.request.Request(answer)
> + req.add_header("Authorization", basic64)
> + with urllib.request.urlopen(req) as response:
> + return response.read().replace(b'\r\n', b'')
> +
> +class NpCmdApplier:
> + """Applies commands to Np0801Dx instance.
> +
> + This class makes possible the creation of
> + command sequences to be applied in a row.
> + A sequence shall be compounded by one or
> + more commands
> + """
> + def __init__(self, pms):
> + self.commands = []
> +
> + if not isinstance(pms, Np0801Dx):
> + raise ValueError('Not valid Np0801Dx instance')
> +
> + self.np_model = pms
> +
> + def add(self, command):
> + """Add command to FIFO that runs on an update call."""
> + self.commands.append(command)
> +
> + def update(self):
> + """Run the overall of commands from FIFO.
> +
> + Every command shall append its return
> + upon a list of replies, such list shall
> + abide with FIFO order
> + """
> + replies = []
> + for element in self.commands:
> + replies.append(element.go(self.np_model))
> + self.commands = []
> + return replies
> +
> +
> +class NpMasterCmd:
> + """Template pattern for required np-0801dx commands.
> +
> + Any np-0801dx command implementing this
> + template shall supply an instruction code
> + with a number of seconds to concrete the command
> + """
> + def __init__(self, ic, tc):
> + self.inst_code = ic
> + self.breathe_time = tc
> + self.cmd_arg_1 = None
> + self.cmd_arg_2 = None
> +
> + def go(self, pms):
> + """Event handler that reacts to an applier update."""
> + import urllib.parse
> + import urllib.error
> + import time
> + url_params = [self.inst_code]
> + args = [ d for d in (self.cmd_arg_1, self.cmd_arg_2) if d or d == 0 ]
> + for a in (args):
> + arg_str = a if isinstance(a, str) else str(a)
> + if arg_str:
> + url_params.append(" ")
> + url_params.append(arg_str)
> + data = urllib.parse.unquote(''.join(url_params))
> +
> + try:
> + reply = pms.do(data)
> + time.sleep(self.breathe_time)
> + return self.parse_reply(reply)
> + except (urllib.error.URLError, urllib.error.HTTPError) as error:
> + return Np0801Dx.cmd_status['HW_FAIL']
> +
> + def parse_reply(self, reply):
> + """To be implemented as reply parser of any command."""
> + pass
> +
> +
> +class NpCmdAlterSlot(NpMasterCmd):
> + """Implementation of turn off/on over specific slot."""
> + def __init__(self, slot, st):
> +
> + super(NpCmdAlterSlot, self).__init__('$A3', 3)
> +
> + if not isinstance(slot, int):
> + raise ValueError('Not valid socket selected')
> +
> + if not isinstance(st, int):
> + raise ValueError('Not valid state selected')
> +
> + self.cmd_arg_1 = slot
> + self.cmd_arg_2 = st
> +
> + def parse_reply(self, reply):
> + """Return CMD_OK when success otherwise CMD_FAIL."""
> + if reply == b'$A0':
> + return Np0801Dx.cmd_status['CMD_OK']
> + return Np0801Dx.cmd_status['CMD_FAIL']
> +
> +class NpCmdRebootSlot(NpMasterCmd):
> + """Implementation of reboot over specific slot."""
> + def __init__(self, slot):
> +
> + super(NpCmdRebootSlot, self).__init__('$A4', 4)
> +
> + if not isinstance(slot, int):
> + raise ValueError('Not valid socket selected')
> +
> + self.cmd_arg_1 = slot
> +
> + def parse_reply(self, reply):
> + """Return CMD_OK when success otherwise CMD_FAIL."""
> + if reply == b'$A0':
> + return Np0801Dx.cmd_status['CMD_OK']
> + return Np0801Dx.cmd_status['CMD_FAIL']
> +
> +class NpCmdMonitor(NpMasterCmd):
> + """Implementation of getting info to monitor."""
> + def __init__(self):
> +
> + super(NpCmdMonitor, self).__init__('$A5', 2)
> +
> + def parse_reply(self, reply):
> + """Return status info when measures otherwise None."""
> + rv = {'ss':None, 'amps':None, 'temp':None}
> + vals = (reply).decode('utf-8').split(',')
> +
> + if vals[0] == '$A0':
> + if len(vals) == 4:
> + rv['amps'] = vals[2]
> + rv['temp'] = vals[3]
> + rv['ss'] = vals[1]
> + return rv
Only for next step, we need to define a common interface for return the
status of monitor some device, at this step is ok to return values
directly related to the NP-0801D power switch.
> +
> +class NpCmdAlterAllSlots(NpMasterCmd):
> + """Implementation of turn off/on on all the slots."""
> + def __init__(self, st):
> +
> + super(NpCmdAlterAllSlots, self).__init__('$A7', 7)
> +
> + if not isinstance(st, int):
> + raise ValueError('Not valid state selected')
> +
> + self.cmd_arg_1 = st
> +
> + def parse_reply(self, reply):
> + """Return CMD_OK when success otherwise CMD_FAIL."""
> + if reply == b'$A0':
> + return Np0801Dx.cmd_status['CMD_OK']
> + return Np0801Dx.cmd_status['CMD_FAIL']
> diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
> index dea0119..53d0639 100644
> --- a/devauto/rps/hw/rpsgen.py
> +++ b/devauto/rps/hw/rpsgen.py
> @@ -13,11 +13,17 @@ class RPSGen:
> def __init__(self, outlet_count):
> self.outlet_count = outlet_count
>
> - def turn_all_on():
> + def turn_all_on(self):
> pass
> - def turn_all_off():
> + def turn_all_off(self):
> pass
> - def turn_on_by_id(outlet_id):
> + def turn_on_by_id(self,outlet_id):
> pass
> - def turn_off_by_id(outlet_id):
> + def turn_off_by_id(self,outlet_id):
> pass
> +
> +class RPSGenException(Exception):
> + """Remote power switch base exception."""
> + def __init__(self, msg, edata):
> + super(RPSGenException, self).__init__(msg)
> + self.edata = edata
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: OpenPGP digital signature
URL: <http://lists.yoctoproject.org/pipermail/automated-testing/attachments/20160628/60a95035/attachment.pgp>
More information about the automated-testing
mailing list