[Automated-testing] [PATCH] devauto: Method Implementation for np0801dt

Joshua G Lock joshua.g.lock at linux.intel.com
Thu Jun 23 03:53:13 PDT 2016


Hi Edwin,

On Wed, 2016-06-22 at 17:30 -0500, Edwin Plauchu wrote:
> From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> 
> The remote power management system NP-0801D(T) has been implemented
> by utilizing np0801dx.py which allows
> take over series T or B.
> 
> Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
> ---
>  devauto/rps/hw/np0801dt.py |  56 +++++++++++++++-
>  devauto/rps/hw/np0801dx.py | 164
> +++++++++++++++++++++++++++++++++++++++++++++
>  devauto/rps/hw/rpsgen.py   |   8 +--
>  3 files changed, 221 insertions(+), 7 deletions(-)
>  create mode 100644 devauto/rps/hw/np0801dx.py
> 
> diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
> index b1a456d..b7e64ff 100644
> --- a/devauto/rps/hw/np0801dt.py
> +++ b/devauto/rps/hw/np0801dt.py
> @@ -6,8 +6,58 @@ At the time of the implementation, the reference
> website is:
>  http://www.synaccess-net.com/np-0801dt/
>  """
>  from rpsgen import RPSGen
> +from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
> +from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot,
> NpCmdRebootSlot
> +from np0801dx import NpSlotNumber, NpSlotStatus
> +
>  
>  class np0801dt(RPSGen):
> -    outlet_count=8
> -    def __init__(self):
> -        super(outlet_count)
> +
> +    def __init__( self ):
> +        super( np0801dt , self ).__init__( NpSlotNumber.EIGTH.value
> )

Typo in this constant name, should be EIGHT

> +        self.__applier = NpCmdApplier( Np0801Dx("192.168.1.100") )

This hard-codes the IP address. Having a default IP address is not a
bad idea but this should be easier to customise than modifying the
class.

> +
> +    def turn_all_on( self ):
> +        self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.ON ) )
> +        results = self.__applier.update()
> +        return results.pop()
> +
> +    def turn_all_off( self ):
> +        self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.OFF ) )
> +        results = self.__applier.update()
> +        return results.pop()
> +
> +    def turn_on_by_id( self, outlet_id ):
> +        if outlet_id > 0 and outlet_id <= self.outlet_count:
> +            return self.__ac_slot( self.__glue_enum_item(
> NpSlotNumber, outlet_id ), NpSlotStatus.ON )
> +        return False
> +
> +    def turn_off_by_id( self, outlet_id ):
> +        if outlet_id > 0 and outlet_id <= self.outlet_count:
> +            return self.__ac_slot( self.__glue_enum_item(
> NpSlotNumber, outlet_id ), NpSlotStatus.OFF )
> +        return False
> +
> +    def reboot_by_id( self, outlet_id ):
> +        if outlet_id > 0 and outlet_id <= self.outlet_count:
> +            slot = self.__glue_enum_item( NpSlotNumber, outlet_id )
> +            self.__applier.add( NpCmdRebootSlot( slot ) )
> +            results = self.__applier.update()
> +            return results.pop()
> +        return False
> +
> +    def monitor( self ):
> +        self.__applier.add( NpCmdMonitor() )
> +        results = self.__applier.update()
> +        return results.pop()
> +
> +    def __ac_slot( self, slot_number, status ):
> +        self.__applier.add( NpCmdAlterSlot( slot_number , status ) )
> +        results = self.__applier.update()
> +        return results.pop()
> +
> +    def __glue_enum_item( self, enum_fam , number ):
> +        relement = None
> +        for name, member in enum_fam.__members__.items():
> +           if member.value == number:
> +               relement = member
> +        return relement
> diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
> new file mode 100644
> index 0000000..9b9bb06
> --- /dev/null
> +++ b/devauto/rps/hw/np0801dx.py
> @@ -0,0 +1,164 @@
> +"""
> +np0801d(t or b) Remote power switch control implementation
> +
> +At the time of the implementation, the reference website is:
> +
> +http://www.synaccess-net.com/np-0801dt/
> +"""
> +
> +from enum import IntEnum, unique
> +
> + at unique
> +class NpSlotNumber( IntEnum ):
> +    ONE   = 1
> +    TWO   = 2
> +    THREE = 3
> +    FOUR  = 4
> +    FIVE  = 5
> +    SIX   = 6
> +    SEVEN = 7
> +    EIGTH = 8

Typo here, should be EIGHT

> +
> + at unique
> +class NpSlotStatus( IntEnum ):
> +    OFF = 0
> +    ON  = 1 
> +
> +
> +class Np0801Dx():
> +
> +    def __init__( self , hostname ):
> +        self.__host = hostname
> +    
> +    def do( self , data ):
> +        import urllib.request
> +        import base64
> +        from array import array
> +
> +        username = 'admin'
> +        password = 'admin'
> +        auth_str = ( '%s:%s' % (username, password) )

Presumably these are the default username and password? I really don't
like the idea of hard-coding these. They should ideally be read from a
configuration file, didn't I see a patch for devauto which adds an ini
file?

> +
> +        answer = "http://{0}/cmd.cgi?{1}".format(self.__host ,
> urllib.request.pathname2url(data))
> +        basic64 = b"Basic " +
> base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'') 
> +        req = urllib.request.Request( answer )
> +        req.add_header( "Authorization", basic64 ) 
> +        with urllib.request.urlopen(req) as response:
> +            return response.read().replace(b'\r\n', b'')
> +
> +class NpCmdApplier:
> +
> +    def __init__( self , pms ):
> +        self.commands = []
> +
> +        if not isinstance( pms , Np0801Dx ):
> +            raise ValueError('Not valid Np0801Dx instance')
> +
> +        self.np_model = pms
> +
> +    def add( self, command ):
> +        self.commands.append( command )
> +
> +    def update(self):

Coding style is fairly inconsistent. Sometimes you do:

def method(foo):

other times

def method( foo )

Please be consistent and consider following PEP-8:
https://www.python.org/dev/peps/pep-0008/

> +       replies = []
> +       iter_obj = iter( self.commands )

If you've got an iterator you should use the more idiomatic pattern:

for element in iter_obj:
    replies.append(element.go(self.np_model))
return replies

> +       while True:
> +          try:
> +             element = next( iter_obj )
> +             replies.append( element.go( self.np_model ) )
> +          except StopIteration:
> +             self.commands = []
> +             break
> +       return replies
> +
> +class NpMasterCmd:
> +
> +    def __init__( self , ic, tc ):
> +        self.inst_code = ic
> +        self.breathe_time = tc
> +        self.cmd_arg_1 = None
> +        self.cmd_arg_2 = None
> +
> +    def go( self , pms ):
> +        import urllib.parse
> +        import time
> +        url_params = [ self.inst_code ] 
> +        args = [ d for d in ( self.cmd_arg_1 , self.cmd_arg_2 ) if d
> or d == 0 ]
> +        for a in ( args ):
> +            arg_str = a if isinstance(a, str) else str(a)
> +            if arg_str:
> +            	url_params.append( " " )
> +            	url_params.append( arg_str )

The indentation looks inconsistent here.

> +        data = urllib.parse.unquote(''.join( url_params ) )
> +        reply = pms.do( data )        
> +        time.sleep( self.breathe_time )
> +        return self.parse_reply( reply )
> +
> +    def parse_reply( self , reply ):
> +        pass
> +
> +
> +class NpCmdAlterSlot( NpMasterCmd ):
> +
> +    def __init__( self, socket , st ):
> +
> +        super( NpCmdAlterSlot , self ).__init__( '$A3' , 3 )
> +
> +        if not isinstance( socket , NpSlotNumber ):
> +            raise ValueError('Not valid socket selected')
> +
> +        if not isinstance( st , NpSlotStatus ):
> +            raise ValueError('Not valid state selected')
> +
> +        self.cmd_arg_1 = socket.value 
> +        self.cmd_arg_2 = st.value
> +
> +    def parse_reply( self , reply ):
> +        return True if reply == b'$A0' else False

could just be:

return reply == b'$A0'

> +
> +class NpCmdRebootSlot( NpMasterCmd ):
> +
> +    def __init__( self, socket ):
> +
> +        super( NpCmdRebootSlot , self ).__init__( '$A4', 4 )
> +
> +        if not isinstance( socket , NpSlotNumber ):
> +            raise ValueError('Not valid socket selected')
> +
> +        self.cmd_arg_1 = socket.value
> +
> +    def parse_reply( self , reply ):
> +        return True if reply == b'$A0' else False 

same here:

return reply == b'$A0'

> +
> +
> +class NpCmdMonitor( NpMasterCmd ):
> +
> +    def __init__( self ):
> +
> +        super( NpCmdMonitor , self ).__init__( '$A5', 2 )
> +
> +    def parse_reply( self , reply ):
> +        rv = None
> +        vals = (reply).decode('utf-8').split(',')
> +
> +        if vals[0] == '$A0':
> +            if len(vals) == 4:
> +                rv = {} 
> +                rv['amperes'] = vals[2]
> +                rv['temperature'] = vals[3]
> +                rv['slot_status'] = vals[1]
> +        return rv
> +
> +class NpCmdAlterAllSlots( NpMasterCmd ):
> +
> +    def __init__( self, st ):
> +
> +        super( NpCmdAlterAllSlots , self ).__init__('$A7', 7 )
> +
> +        if not isinstance( st , NpSlotStatus ):
> +            raise ValueError('Not valid state selected')
> +
> +        self.cmd_arg_1 = st.value
> +
> +    def parse_reply( self , reply ):
> +        return True if reply == b'$A0' else False 
> diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
> index dea0119..e91d832 100644
> --- a/devauto/rps/hw/rpsgen.py
> +++ b/devauto/rps/hw/rpsgen.py
> @@ -13,11 +13,11 @@ class RPSGen:
>      def __init__(self, outlet_count):
>          self.outlet_count = outlet_count
>  
> -    def turn_all_on():
> +    def turn_all_on(self):
>          pass
> -    def turn_all_off():
> +    def turn_all_off(self):
>          pass
> -    def turn_on_by_id(outlet_id):
> +    def turn_on_by_id(self,outlet_id):
>          pass
> -    def turn_off_by_id(outlet_id):
> +    def turn_off_by_id(self,outlet_id):
>          pass
> -- 
> 1.9.1
> 


More information about the automated-testing mailing list