[Automated-testing] [PATCH v2] devauto: Method Implementation for np0801dt

Aníbal Limón anibal.limon at linux.intel.com
Tue Jun 28 13:29:12 PDT 2016



On 06/28/2016 11:10 AM, Edwin Plauchu wrote:
> From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> 
> The remote power management system NP-0801D(T) has been
> implemented by utilizing np0801dx.py which allows take over series T or B.
> 
> Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> ---
>  devauto/rps/hw/np0801dt.py |  39 +++++++++-
>  devauto/rps/hw/np0801dx.py | 185 +++++++++++++++++++++++++++++++++++++++++++++
>  devauto/rps/hw/rpsgen.py   |  14 +++-
>  3 files changed, 231 insertions(+), 7 deletions(-)
>  create mode 100644 devauto/rps/hw/np0801dx.py
> 
> diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
> index b1a456d..440612e 100644
> --- a/devauto/rps/hw/np0801dt.py
> +++ b/devauto/rps/hw/np0801dt.py
> @@ -5,9 +5,42 @@ At the time of the implementation, the reference website is:
>  
>  http://www.synaccess-net.com/np-0801dt/
>  """
> -from rpsgen import RPSGen
> +from rpsgen import RPSGen, RPSGenException
> +from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
> +from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot, NpCmdRebootSlot
> +
>  
>  class np0801dt(RPSGen):
> -    outlet_count=8
> +
>      def __init__(self):
> -        super(outlet_count)
> +        super(np0801dt, self).__init__(Np0801Dx.slot_max)
> +        device_ip = '192.168.1.100' #This shall come from a config file
> +        adm_auth_info = { 'username' : 'admin', 'password' : 'admin' } #This shall come from a config file

For handle options of this module you could add *args and **kwargs in
__init__(self, *args, **kwargs) and the search for specific parameters
such as 'device_ip', 'username' and 'password' into kwargs if not found
use a default one defining a constant at class level i.e.

DEFAULT_SERVER_IP = '192.168.1.100'


> +        self.__applier = NpCmdApplier(Np0801Dx(device_ip, adm_auth_info))
> +
> +    def turn_all_on(self):
> +        self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['ON']))
> +        results = self.__applier.update()
> +        self.__eval_except(results.pop())
> +
> +    def turn_all_off(self):
> +        self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status['OFF']))
> +        results = self.__applier.update()
> +        self.__eval_except(results.pop())
> +
> +    def turn_on_by_id(self, outlet_id):
> +        self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['ON']))
> +
> +    def turn_off_by_id(self, outlet_id):
> +        self.__eval_except(self.__ac_slot(outlet_id, Np0801Dx.slot_status['OFF']))
> +
> +    def __eval_except(self, cmd_rc):
> +        if cmd_rc == Np0801Dx.cmd_status['CMD_FAIL']:
> +            raise RPSGenException("Failed action", cmd_rc)
> +        if cmd_rc == Np0801Dx.cmd_status['HW_FAIL']:
> +            raise RPSGenException("Experimenting a hw fail",cmd_rc)
> +
> +    def __ac_slot(self, slot_number, status):
> +        self.__applier.add( NpCmdAlterSlot(slot_number, status))
> +        results = self.__applier.update()
> +        return results.pop()
> diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
> new file mode 100644
> index 0000000..12bb76a
> --- /dev/null
> +++ b/devauto/rps/hw/np0801dx.py
> @@ -0,0 +1,185 @@
> +"""
> +np0801d(t or b) Remote power switch control implementation
> +
> +At the time of the implementation, the reference website is:
> +
> +http://www.synaccess-net.com/np-0801dt/
> +"""
> +
> +class Np0801Dx():
> +    """Represents capabilities and features of np0801dx.
> +
> +    Ip address and administration credentials
> +    of np-0801dx shall be required to control
> +    """
> +    cmd_status = {'CMD_OK': 0, 'CMD_FAIL': -1, 'HW_FAIL': -2}

The returns needs to be standardized across modules so this definitions
needs to be difined at level of the base class RPSControl.

> +    slot_status = {'ON': 1, 'OFF': 0}
> +    slot_max = 8
> +
> +    def __init__(self, hostname, adm_auth_info):
> +        self.__host = hostname
> +        self.__adm_auth_info = adm_auth_info
> +
> +    def do(self, data):
> +        """Do a command through np-0801dx CGI-API.
> +
> +        It shall receive an unquote string to be pushed
> +        as parameters of a HTTP-GET request
> +        """
> +        import urllib.request
> +        import base64
> +        from array import array
> +
> +        auth_str = ('%s:%s' % (self.__adm_auth_info['username'], self.__adm_auth_info['password']))
> +        answer = "http://{0}/cmd.cgi?{1}".format(self.__host, urllib.request.pathname2url(data))
> +        basic64 = b"Basic " + base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'') 
> +        req = urllib.request.Request(answer)
> +        req.add_header("Authorization", basic64)
> +        with urllib.request.urlopen(req) as response:
> +            return response.read().replace(b'\r\n', b'')
> +
> +class NpCmdApplier:
> +    """Applies commands to Np0801Dx instance.
> +
> +    This class makes possible the creation of
> +    command sequences to be applied in a row.
> +    A sequence shall be compounded by one or
> +    more commands
> +    """
> +    def __init__(self, pms):
> +        self.commands = []
> +
> +        if not isinstance(pms, Np0801Dx):
> +            raise ValueError('Not valid Np0801Dx instance')
> +
> +        self.np_model = pms
> +
> +    def add(self, command):
> +        """Add command to FIFO that runs on an update call."""
> +        self.commands.append(command)
> +
> +    def update(self):
> +        """Run the overall of commands from FIFO.
> +
> +        Every command shall append its return
> +        upon a list of replies, such list shall
> +        abide with FIFO order
> +        """
> +        replies = []
> +        for element in self.commands:
> +            replies.append(element.go(self.np_model))
> +        self.commands = []
> +        return replies
> +
> +
> +class NpMasterCmd:
> +    """Template pattern for required np-0801dx commands.
> +
> +    Any np-0801dx command implementing this
> +    template shall supply an instruction code
> +    with a number of seconds to concrete the command
> +    """
> +    def __init__(self, ic, tc):
> +        self.inst_code = ic
> +        self.breathe_time = tc
> +        self.cmd_arg_1 = None
> +        self.cmd_arg_2 = None
> +
> +    def go(self, pms):
> +        """Event handler that reacts to an applier update."""
> +        import urllib.parse
> +        import urllib.error
> +        import time
> +        url_params = [self.inst_code]
> +        args = [ d for d in (self.cmd_arg_1, self.cmd_arg_2) if d or d == 0 ]
> +        for a in (args):
> +            arg_str = a if isinstance(a, str) else str(a)
> +            if arg_str:
> +                url_params.append(" ")
> +                url_params.append(arg_str)
> +        data = urllib.parse.unquote(''.join(url_params))
> +
> +        try:
> +            reply = pms.do(data)
> +            time.sleep(self.breathe_time)
> +            return self.parse_reply(reply)
> +        except (urllib.error.URLError, urllib.error.HTTPError) as error:
> +            return Np0801Dx.cmd_status['HW_FAIL']
> +
> +    def parse_reply(self, reply):
> +        """To be implemented as reply parser of any command."""
> +        pass
> +
> +
> +class NpCmdAlterSlot(NpMasterCmd):
> +    """Implementation of turn off/on over specific slot."""
> +    def __init__(self, slot, st):
> +
> +        super(NpCmdAlterSlot, self).__init__('$A3', 3)
> +
> +        if not isinstance(slot, int):
> +            raise ValueError('Not valid socket selected')
> +
> +        if not isinstance(st, int):
> +            raise ValueError('Not valid state selected')
> +
> +        self.cmd_arg_1 = slot
> +        self.cmd_arg_2 = st
> +
> +    def parse_reply(self, reply):
> +        """Return CMD_OK when success otherwise CMD_FAIL."""
> +        if reply == b'$A0':
> +            return Np0801Dx.cmd_status['CMD_OK']
> +        return Np0801Dx.cmd_status['CMD_FAIL']
> +
> +class NpCmdRebootSlot(NpMasterCmd):
> +    """Implementation of reboot over specific slot."""
> +    def __init__(self, slot):
> +
> +        super(NpCmdRebootSlot, self).__init__('$A4', 4)
> +
> +        if not isinstance(slot, int):
> +            raise ValueError('Not valid socket selected')
> +
> +        self.cmd_arg_1 = slot
> +
> +    def parse_reply(self, reply):
> +        """Return CMD_OK when success otherwise CMD_FAIL."""
> +        if reply == b'$A0':
> +            return Np0801Dx.cmd_status['CMD_OK']
> +        return Np0801Dx.cmd_status['CMD_FAIL']
> +
> +class NpCmdMonitor(NpMasterCmd):
> +    """Implementation of getting info to monitor."""
> +    def __init__(self):
> +
> +        super(NpCmdMonitor, self).__init__('$A5', 2)
> +
> +    def parse_reply(self, reply):
> +        """Return status info when measures otherwise None."""
> +        rv = {'ss':None, 'amps':None, 'temp':None} 
> +        vals = (reply).decode('utf-8').split(',')
> +
> +        if vals[0] == '$A0':
> +            if len(vals) == 4:
> +                rv['amps'] = vals[2]
> +                rv['temp'] = vals[3]
> +                rv['ss'] = vals[1]
> +        return rv

Only for next step, we need to define a common interface for return the
status of monitor some device, at this step is ok to return values
directly related to the NP-0801D power switch.

> +
> +class NpCmdAlterAllSlots(NpMasterCmd):
> +    """Implementation of turn off/on on all the slots."""
> +    def __init__(self, st):
> +
> +        super(NpCmdAlterAllSlots, self).__init__('$A7', 7)
> +
> +        if not isinstance(st, int):
> +            raise ValueError('Not valid state selected')
> +
> +        self.cmd_arg_1 = st
> +
> +    def parse_reply(self, reply):
> +        """Return CMD_OK when success otherwise CMD_FAIL."""
> +        if reply == b'$A0':
> +            return Np0801Dx.cmd_status['CMD_OK']
> +        return Np0801Dx.cmd_status['CMD_FAIL']
> diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
> index dea0119..53d0639 100644
> --- a/devauto/rps/hw/rpsgen.py
> +++ b/devauto/rps/hw/rpsgen.py
> @@ -13,11 +13,17 @@ class RPSGen:
>      def __init__(self, outlet_count):
>          self.outlet_count = outlet_count
>  
> -    def turn_all_on():
> +    def turn_all_on(self):
>          pass
> -    def turn_all_off():
> +    def turn_all_off(self):
>          pass
> -    def turn_on_by_id(outlet_id):
> +    def turn_on_by_id(self,outlet_id):
>          pass
> -    def turn_off_by_id(outlet_id):
> +    def turn_off_by_id(self,outlet_id):
>          pass
> +
> +class RPSGenException(Exception):
> +    """Remote power switch base exception."""
> +    def __init__(self, msg, edata):
> +        super(RPSGenException, self).__init__(msg)
> +        self.edata = edata
> 

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: OpenPGP digital signature
URL: <http://lists.yoctoproject.org/pipermail/automated-testing/attachments/20160628/60a95035/attachment.pgp>


More information about the automated-testing mailing list