[Automated-testing] [PATCH] devauto: Method Implementation for np0801dt

Benjamin Esquivel benjamin.esquivel at linux.intel.com
Thu Jun 23 12:00:21 PDT 2016


Thanks for sending the review :)

I have some specific comments inline.

One overall comment I have is that all the classes and functions should
be documented as pep 257[1] exemplifies.


[1] https://www.python.org/dev/peps/pep-0257/

> 
> -----Original Message-----
> From: automated-testing-bounces at yoctoproject.org [mailto:automated-
> testing-bounces at yoctoproject.org] On Behalf Of Edwin Plauchu
> Sent: Wednesday, June 22, 2016 5:30 PM
> To: automated-testing at yoctoproject.org
> Cc: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> Subject: [Automated-testing] [PATCH] devauto: Method Implementation
> for
> np0801dt
> 
> From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> 
> The remote power management system NP-0801D(T) has been
> implemented by utilizing np0801dx.py which allows take over series T
> or B.
> 
> 
> Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> ---
>  devauto/rps/hw/np0801dt.py |  56 +++++++++++++++-
> devauto/rps/hw/np0801dx.py | 164
> +++++++++++++++++++++++++++++++++++++++++++++
>  devauto/rps/hw/rpsgen.py   |   8 +--
>  3 files changed, 221 insertions(+), 7 deletions(-)  create mode
> 100644
> devauto/rps/hw/np0801dx.py
> 
> diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
> index b1a456d..b7e64ff 100644
> --- a/devauto/rps/hw/np0801dt.py
> +++ b/devauto/rps/hw/np0801dt.py
> @@ -6,8 +6,58 @@ At the time of the implementation, the reference
> website is:
>  http://www.synaccess-net.com/np-0801dt/
>  """
>  from rpsgen import RPSGen
> +from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor from
> np0801dx
> +import NpCmdAlterAllSlots, NpCmdAlterSlot, NpCmdRebootSlot from
> +np0801dx import NpSlotNumber, NpSlotStatus
The above 3 import lines all take items from np0801dx. Consider just
importing np0801dx.
> 
> +
> 
>  class np0801dt(RPSGen):
> -    outlet_count=8
> -    def __init__(self):
> -        super(outlet_count)
> +
> +    def __init__( self ):
> +        super( np0801dt , self ).__init__( NpSlotNumber.EIGTH.value
> )
> +        self.__applier = NpCmdApplier( Np0801Dx("192.168.1.100") )
Agreeing with Joshua on the ip here should not be hardcoded and better
if read through the ini settings. 
> 
> +
> +    def turn_all_on( self ):
> +        self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.ON
> ) )+        results = self.__applier.update()
since you're asking something to a module that controls hardware, you
should be watching out for exceptions. Applies to all the actions
below.
> +        return results.pop()
> +
> +    def turn_all_off( self ):
> +        self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.OFF ) )
> +        results = self.__applier.update()
> +        return results.pop()
> +
> +    def turn_on_by_id( self, outlet_id ):
> +        if outlet_id > 0 and outlet_id <= self.outlet_count:
> +            return self.__ac_slot( self.__glue_enum_item(
> NpSlotNumber,
> outlet_id ), NpSlotStatus.ON )
This is very C like, you would raise an exception if the request is out
of bounds. You would (re-)raise another exception if an error happened.
And you can just ignore returning at all.
> +        return False
> +
> +    def turn_off_by_id( self, outlet_id ):
> +        if outlet_id > 0 and outlet_id <= self.outlet_count:
> +            return self.__ac_slot( self.__glue_enum_item(
> NpSlotNumber,
> outlet_id ), NpSlotStatus.OFF )
> +        return False
> +
> +    def reboot_by_id( self, outlet_id ):
This might not be the place to handle a reboot. Once you have exposed
the turn on and off methods you can expect that the entity that takes
care of the state machine of the devices is commanding and implementing
reboots and power cycles. I'd say this is polluting the abstraction.
> +        if outlet_id > 0 and outlet_id <= self.outlet_count:
> +            slot = self.__glue_enum_item( NpSlotNumber, outlet_id )
> +            self.__applier.add( NpCmdRebootSlot( slot ) )
> +            results = self.__applier.update()
> +            return results.pop()
> +        return False
> +
> +    def monitor( self ):
what is this for?
> +        self.__applier.add( NpCmdMonitor() )
> +        results = self.__applier.update()
> +        return results.pop()
> +
> +    def __ac_slot( self, slot_number, status ):
> +        self.__applier.add( NpCmdAlterSlot( slot_number , status ) )
> +        results = self.__applier.update()
> +        return results.pop()
> +
> +    def __glue_enum_item( self, enum_fam , number ):
If I understand correctly the purpose of this function, you're
implementing what a python dictionary does. Can you explain what's the
purpose of your function? 
> +        relement = None
> +        for name, member in enum_fam.__members__.items():
> +           if member.value == number:
> +               relement = member
> +        return relement
> diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
> new file mode 100644 index 0000000..9b9bb06
> --- /dev/null
> +++ b/devauto/rps/hw/np0801dx.py
> @@ -0,0 +1,164 @@
> +"""
> +np0801d(t or b) Remote power switch control implementation
> +
> +At the time of the implementation, the reference website is:
> +
> +http://www.synaccess-net.com/np-0801dt/
> +"""
> +
> +from enum import IntEnum, unique
> +
> + at unique
> +class NpSlotNumber( IntEnum ):
> +    ONE   = 1
> +    TWO   = 2
> +    THREE = 3
> +    FOUR  = 4
> +    FIVE  = 5
> +    SIX   = 6
> +    SEVEN = 7
> +    EIGTH = 8
What is this providing than a dictionary can't provide?
> +
> + at unique
> +class NpSlotStatus( IntEnum ):
> +    OFF = 0
> +    ON  = 1
What is this providing than a dictionary can't provide?
> 
> +
> +
> +class Np0801Dx():
> +
> +    def __init__( self , hostname ):
you could skip documenting __init__ functions if you like :)
> +        self.__host = hostname
> +
> +    def do( self , data ):
> +        import urllib.request
> +        import base64
> +        from array import array
> +
> +        username = 'admin'
> +        password = 'admin'
> +        auth_str = ( '%s:%s' % (username, password) )
> +
> +        answer = "http://{0}/cmd.cgi?{1}".format(self.__host ,
> urllib.request.pathname2url(data))
> +        basic64 = b"Basic " +
> base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'')
> +        req = urllib.request.Request( answer )
> +        req.add_header( "Authorization", basic64 )
> +        with urllib.request.urlopen(req) as response:
> +            return response.read().replace(b'\r\n', b'')
> +
> +class NpCmdApplier:
> +
> +    def __init__( self , pms ):
spaces only after commas in function params like: 
def __init__(self, pms):
like pep 257 indicates. Should fix in a lot of places in this code.
> +        self.commands = []
> +
> +        if not isinstance( pms , Np0801Dx ):
> +            raise ValueError('Not valid Np0801Dx instance')
> +
> +        self.np_model = pms
> +
> +    def add( self, command ):
> +        self.commands.append( command )
> +
> +    def update(self):
> +       replies = []
> +       iter_obj = iter( self.commands )
> +       while True:
> +          try:
> +             element = next( iter_obj )
> +             replies.append( element.go( self.np_model ) )
> +          except StopIteration:
> +             self.commands = []
> +             break
> +       return replies
> +
> +class NpMasterCmd:
> +
> +    def __init__( self , ic, tc ):
> +        self.inst_code = ic
> +        self.breathe_time = tc
> +        self.cmd_arg_1 = None
> +        self.cmd_arg_2 = None
> +
> +    def go( self , pms ):
> +        import urllib.parse
> +        import time
> +        url_params = [ self.inst_code ]
> +        args = [ d for d in ( self.cmd_arg_1 , self.cmd_arg_2 ) if d
> or d == 0 ]
> 
> +        for a in ( args ):
> +            arg_str = a if isinstance(a, str) else str(a)
> +            if arg_str:
> +            	url_params.append( " " )
> +            	url_params.append( arg_str )
> +        data = urllib.parse.unquote(''.join( url_params ) )
> +        reply = pms.do( data )
> +        time.sleep( self.breathe_time )
> +        return self.parse_reply( reply )
> +
> +    def parse_reply( self , reply ):
> +        pass
> +
> +
> +class NpCmdAlterSlot( NpMasterCmd ):
> +
> +    def __init__( self, socket , st ):
> +
> +        super( NpCmdAlterSlot , self ).__init__( '$A3' , 3 )
> +
> +        if not isinstance( socket , NpSlotNumber ):
> +            raise ValueError('Not valid socket selected')
> +
> +        if not isinstance( st , NpSlotStatus ):
> +            raise ValueError('Not valid state selected')
> +
> +        self.cmd_arg_1 = socket.value
> +        self.cmd_arg_2 = st.value
> +
> +    def parse_reply( self , reply ):
> +        return True if reply == b'$A0' else False
> +
> +class NpCmdRebootSlot( NpMasterCmd ):
> +
> +    def __init__( self, socket ):
> +
> +        super( NpCmdRebootSlot , self ).__init__( '$A4', 4 )
> +
> +        if not isinstance( socket , NpSlotNumber ):
> +            raise ValueError('Not valid socket selected')
> +
> +        self.cmd_arg_1 = socket.value
> +
> +    def parse_reply( self , reply ):
> +        return True if reply == b'$A0' else False
> +
> +
> +class NpCmdMonitor( NpMasterCmd ):
> +
> +    def __init__( self ):
> +
> +        super( NpCmdMonitor , self ).__init__( '$A5', 2 )
> +
> +    def parse_reply( self , reply ):
> +        rv = None
> +        vals = (reply).decode('utf-8').split(',')
> +
> +        if vals[0] == '$A0':
> +            if len(vals) == 4:
> +                rv = {}
> +                rv['amperes'] = vals[2]
> +                rv['temperature'] = vals[3]
> +                rv['slot_status'] = vals[1]
> +        return rv
> +
> +class NpCmdAlterAllSlots( NpMasterCmd ):
> +
> +    def __init__( self, st ):
> +
> +        super( NpCmdAlterAllSlots , self ).__init__('$A7', 7 )
> +
> +        if not isinstance( st , NpSlotStatus ):
> +            raise ValueError('Not valid state selected')
> +
> +        self.cmd_arg_1 = st.value
> +
> +    def parse_reply( self , reply ):
> +        return True if reply == b'$A0' else False
> diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
> index
> dea0119..e91d832 100644
> --- a/devauto/rps/hw/rpsgen.py
> +++ b/devauto/rps/hw/rpsgen.py
> @@ -13,11 +13,11 @@ class RPSGen:
>      def __init__(self, outlet_count):
>          self.outlet_count = outlet_count
> 
> -    def turn_all_on():
> +    def turn_all_on(self):
good catch, thanks for fixing it.
>          pass
> -    def turn_all_off():
> +    def turn_all_off(self):
>          pass
> -    def turn_on_by_id(outlet_id):
> +    def turn_on_by_id(self,outlet_id):
>          pass
> -    def turn_off_by_id(outlet_id):
> +    def turn_off_by_id(self,outlet_id):
>          pass
> --
> 1.9.1
> 
> --
> _______________________________________________
> automated-testing mailing list
> automated-testing at yoctoproject.org
> https://lists.yoctoproject.org/listinfo/automated-testing


More information about the automated-testing mailing list