[Automated-testing] [PATCH v2] devauto: Method Implementation for np0801dt

Benjamin Esquivel benjamin.esquivel at linux.intel.com
Tue Jun 28 15:44:49 PDT 2016


On Tue, 2016-06-28 at 15:29 -0500, Aníbal Limón wrote:
> 
> On 06/28/2016 11:10 AM, Edwin Plauchu wrote:
> > 
> > From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> > 
> > The remote power management system NP-0801D(T) has been
> > implemented by utilizing np0801dx.py which allows take over series
> > T or B.
> > 
> > Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> > ---
> >  devauto/rps/hw/np0801dt.py |  39 +++++++++-
> >  devauto/rps/hw/np0801dx.py | 185
> > +++++++++++++++++++++++++++++++++++++++++++++
> >  devauto/rps/hw/rpsgen.py   |  14 +++-
> >  3 files changed, 231 insertions(+), 7 deletions(-)
> >  create mode 100644 devauto/rps/hw/np0801dx.py
> > 
> > diff --git a/devauto/rps/hw/np0801dt.py
> > b/devauto/rps/hw/np0801dt.py
> > index b1a456d..440612e 100644
> > --- a/devauto/rps/hw/np0801dt.py
> > +++ b/devauto/rps/hw/np0801dt.py
> > @@ -5,9 +5,42 @@ At the time of the implementation, the reference
> > website is:
> >  
> >  http://www.synaccess-net.com/np-0801dt/
> >  """
> > -from rpsgen import RPSGen
> > +from rpsgen import RPSGen, RPSGenException
> > +from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
> > +from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot,
> > NpCmdRebootSlot
> > +
> >  
> >  class np0801dt(RPSGen):
> > -    outlet_count=8
> > +
> >      def __init__(self):
> > -        super(outlet_count)
> > +        super(np0801dt, self).__init__(Np0801Dx.slot_max)
> > +        device_ip = '192.168.1.100' #This shall come from a config
> > file
> > +        adm_auth_info = { 'username' : 'admin', 'password' :
> > 'admin' } #This shall come from a config file
> For handle options of this module you could add *args and **kwargs in
> __init__(self, *args, **kwargs) and the search for specific
> parameters
> such as 'device_ip', 'username' and 'password' into kwargs if not
> found
> use a default one defining a constant at class level i.e.
> 
> DEFAULT_SERVER_IP = '192.168.1.100'
> 
> 
> > 
> > +        self.__applier = NpCmdApplier(Np0801Dx(device_ip,
> > adm_auth_info))
> > +
> > +    def turn_all_on(self):
> > +        self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status
> > ['ON']))
> > +        results = self.__applier.update()
> > +        self.__eval_except(results.pop())
> > +
> > +    def turn_all_off(self):
> > +        self.__applier.add(NpCmdAlterAllSlots(Np0801Dx.slot_status
> > ['OFF']))
> > +        results = self.__applier.update()
> > +        self.__eval_except(results.pop())
> > +
> > +    def turn_on_by_id(self, outlet_id):
> > +        self.__eval_except(self.__ac_slot(outlet_id,
> > Np0801Dx.slot_status['ON']))
> > +
> > +    def turn_off_by_id(self, outlet_id):
> > +        self.__eval_except(self.__ac_slot(outlet_id,
> > Np0801Dx.slot_status['OFF']))
> > +
> > +    def __eval_except(self, cmd_rc):
this function is not good for handling exceptions and it is still based
in return codes. We should gather to talk about how this layer should
look like.
> > +        if cmd_rc == Np0801Dx.cmd_status['CMD_FAIL']:
> > +            raise RPSGenException("Failed action", cmd_rc)
> > +        if cmd_rc == Np0801Dx.cmd_status['HW_FAIL']:
> > +            raise RPSGenException("Experimenting a hw
> > fail",cmd_rc)
> > +
> > +    def __ac_slot(self, slot_number, status):
> > +        self.__applier.add( NpCmdAlterSlot(slot_number, status))
> > +        results = self.__applier.update()
> > +        return results.pop()
> > diff --git a/devauto/rps/hw/np0801dx.py
> > b/devauto/rps/hw/np0801dx.py
> > new file mode 100644
> > index 0000000..12bb76a
> > --- /dev/null
> > +++ b/devauto/rps/hw/np0801dx.py
> > @@ -0,0 +1,185 @@
> > +"""
> > +np0801d(t or b) Remote power switch control implementation
> > +
> > +At the time of the implementation, the reference website is:
> > +
> > +http://www.synaccess-net.com/np-0801dt/
> > +"""
> > +
> > +class Np0801Dx():
> > +    """Represents capabilities and features of np0801dx.
> > +
> > +    Ip address and administration credentials
> > +    of np-0801dx shall be required to control
> > +    """
> > +    cmd_status = {'CMD_OK': 0, 'CMD_FAIL': -1, 'HW_FAIL': -2}
> The returns needs to be standardized across modules so this
> definitions
> needs to be difined at level of the base class RPSControl.
This could be better handled via exceptions instead of enums for return
commands
> 
> > 
> > +    slot_status = {'ON': 1, 'OFF': 0}
> > +    slot_max = 8
> > +
> > +    def __init__(self, hostname, adm_auth_info):
> > +        self.__host = hostname
> > +        self.__adm_auth_info = adm_auth_info
> > +
> > +    def do(self, data):
The name of the function is too generic, request or send_request would
describe it better.
> > +        """Do a command through np-0801dx CGI-API.
> > +
> > +        It shall receive an unquote string to be pushed
> > +        as parameters of a HTTP-GET request
> > +        """
> > +        import urllib.request
> > +        import base64
> > +        from array import array
> > +
> > +        auth_str = ('%s:%s' % (self.__adm_auth_info['username'],
> > self.__adm_auth_info['password']))
> > +        answer = "http://{0}/cmd.cgi?{1}".format(self.__host,
> > urllib.request.pathname2url(data))
Mixing the use of .format and %s string concatentation.
> > +        basic64 = b"Basic " +
> > base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'') 
> > +        req = urllib.request.Request(answer)
> > +        req.add_header("Authorization", basic64)
> > +        with urllib.request.urlopen(req) as response:
> > +            return response.read().replace(b'\r\n', b'')
> > +
> > +class NpCmdApplier:
> > +    """Applies commands to Np0801Dx instance.
> > +
> > +    This class makes possible the creation of
> > +    command sequences to be applied in a row.
> > +    A sequence shall be compounded by one or
> > +    more commands
72 chars for docstrings, means the last word should not pass the 72
chars. These lines are way shorter. They will mess up the documentation
format. Applies to the rest of the function comments.
> > +    """
> > +    def __init__(self, pms):
> > +        self.commands = []
> > +
> > +        if not isinstance(pms, Np0801Dx):
> > +            raise ValueError('Not valid Np0801Dx instance')
> > +
> > +        self.np_model = pms
> > +
> > +    def add(self, command):
> > +        """Add command to FIFO that runs on an update call."""
> > +        self.commands.append(command)
> > +
> > +    def update(self):
> > +        """Run the overall of commands from FIFO.
> > +
> > +        Every command shall append its return
> > +        upon a list of replies, such list shall
> > +        abide with FIFO order
> > +        """
> > +        replies = []
> > +        for element in self.commands:
> > +            replies.append(element.go(self.np_model))
> > +        self.commands = []
> > +        return replies
> > +
> > +
> > +class NpMasterCmd:
> > +    """Template pattern for required np-0801dx commands.
> > +
> > +    Any np-0801dx command implementing this
> > +    template shall supply an instruction code
> > +    with a number of seconds to concrete the command
> > +    """
> > +    def __init__(self, ic, tc):
> > +        self.inst_code = ic
> > +        self.breathe_time = tc
> > +        self.cmd_arg_1 = None
> > +        self.cmd_arg_2 = None
> > +
> > +    def go(self, pms):
> > +        """Event handler that reacts to an applier update."""
> > +        import urllib.parse
> > +        import urllib.error
> > +        import time
> > +        url_params = [self.inst_code]
> > +        args = [ d for d in (self.cmd_arg_1, self.cmd_arg_2) if d
> > or d == 0 ]
> > +        for a in (args):
> > +            arg_str = a if isinstance(a, str) else str(a)
> > +            if arg_str:
> > +                url_params.append(" ")
> > +                url_params.append(arg_str)
> > +        data = urllib.parse.unquote(''.join(url_params))
> > +
> > +        try:
> > +            reply = pms.do(data)
> > +            time.sleep(self.breathe_time)
> > +            return self.parse_reply(reply)
> > +        except (urllib.error.URLError, urllib.error.HTTPError) as
> > error:
> > +            return Np0801Dx.cmd_status['HW_FAIL']
almost there with implementing the exception, if hardware failed then
log it and re-raise so the calling instance knows that something wrong
happened.
> > +
> > +    def parse_reply(self, reply):
> > +        """To be implemented as reply parser of any command."""
> > +        pass
> > +
> > +
> > +class NpCmdAlterSlot(NpMasterCmd):
> > +    """Implementation of turn off/on over specific slot."""
> > +    def __init__(self, slot, st):
> > +
> > +        super(NpCmdAlterSlot, self).__init__('$A3', 3)
What's this $A3?
> > +
> > +        if not isinstance(slot, int):
> > +            raise ValueError('Not valid socket selected')
> > +
> > +        if not isinstance(st, int):
> > +            raise ValueError('Not valid state selected')
> > +
> > +        self.cmd_arg_1 = slot
> > +        self.cmd_arg_2 = st
> > +
> > +    def parse_reply(self, reply):
> > +        """Return CMD_OK when success otherwise CMD_FAIL."""
perhaps the function can be described better.
> > +        if reply == b'$A0':
> > +            return Np0801Dx.cmd_status['CMD_OK']
> > +        return Np0801Dx.cmd_status['CMD_FAIL']
CMD_OK is pretty useless. Also CMD_FAIL is useless if you instead of
returning upon failure raise an exception.
> > +
> > +class NpCmdRebootSlot(NpMasterCmd):
The reboot is out of the scope of this module. if it is required to do
power sequences, like reboot, they should be done in the next
abstraction layer. I believe it is safe to remove this class.
> > +    """Implementation of reboot over specific slot."""
> > +    def __init__(self, slot):
> > +
> > +        super(NpCmdRebootSlot, self).__init__('$A4', 4)
> > +
> > +        if not isinstance(slot, int):
> > +            raise ValueError('Not valid socket selected')
> > +
> > +        self.cmd_arg_1 = slot
> > +
> > +    def parse_reply(self, reply):
> > +        """Return CMD_OK when success otherwise CMD_FAIL."""
> > +        if reply == b'$A0':
> > +            return Np0801Dx.cmd_status['CMD_OK']
> > +        return Np0801Dx.cmd_status['CMD_FAIL']
> > +
> > +class NpCmdMonitor(NpMasterCmd):
> > +    """Implementation of getting info to monitor."""
> > +    def __init__(self):
> > +
> > +        super(NpCmdMonitor, self).__init__('$A5', 2)
> > +
> > +    def parse_reply(self, reply):
> > +        """Return status info when measures otherwise None."""
> > +        rv = {'ss':None, 'amps':None, 'temp':None} 
> > +        vals = (reply).decode('utf-8').split(',')
> > +
> > +        if vals[0] == '$A0':
> > +            if len(vals) == 4:
> > +                rv['amps'] = vals[2]
> > +                rv['temp'] = vals[3]
> > +                rv['ss'] = vals[1]
else you raise an error
> > +        return rv
this is a good case to use a return.
> Only for next step, we need to define a common interface for return
> the
> status of monitor some device, at this step is ok to return values
> directly related to the NP-0801D power switch.
> 
> > 
> > +
> > +class NpCmdAlterAllSlots(NpMasterCmd):
> > +    """Implementation of turn off/on on all the slots."""
> > +    def __init__(self, st):
> > +
> > +        super(NpCmdAlterAllSlots, self).__init__('$A7', 7)
> > +
> > +        if not isinstance(st, int):
> > +            raise ValueError('Not valid state selected')
> > +
> > +        self.cmd_arg_1 = st
> > +
> > +    def parse_reply(self, reply):
> > +        """Return CMD_OK when success otherwise CMD_FAIL."""
function description needs work as well.
> > +        if reply == b'$A0':
> > +            return Np0801Dx.cmd_status['CMD_OK']
> > +        return Np0801Dx.cmd_status['CMD_FAIL']
same thing with the returns, you don't return Okays, you raise errors
when things go wrong.
> > diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
> > index dea0119..53d0639 100644
> > --- a/devauto/rps/hw/rpsgen.py
> > +++ b/devauto/rps/hw/rpsgen.py
> > @@ -13,11 +13,17 @@ class RPSGen:
> >      def __init__(self, outlet_count):
> >          self.outlet_count = outlet_count
> >  
> > -    def turn_all_on():
> > +    def turn_all_on(self):
> >          pass
> > -    def turn_all_off():
> > +    def turn_all_off(self):
> >          pass
> > -    def turn_on_by_id(outlet_id):
> > +    def turn_on_by_id(self,outlet_id):
> >          pass
> > -    def turn_off_by_id(outlet_id):
> > +    def turn_off_by_id(self,outlet_id):
> >          pass
> > +
> > +class RPSGenException(Exception):
> > +    """Remote power switch base exception."""
> > +    def __init__(self, msg, edata):
> > +        super(RPSGenException, self).__init__(msg)
> > +        self.edata = edata
> > 
> -- 
> _______________________________________________
> automated-testing mailing list
> automated-testing at yoctoproject.org
> https://lists.yoctoproject.org/listinfo/automated-testing


More information about the automated-testing mailing list