[Automated-testing] [PATCH] devauto: Method Implementation for np0801dt
Edwin Plauchu
edwin.plauchu.camacho at linux.intel.com
Wed Jun 22 15:30:09 PDT 2016
From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
The remote power management system NP-0801D(T) has been implemented by utilizing np0801dx.py which allows
take over series T or B.
Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
---
devauto/rps/hw/np0801dt.py | 56 +++++++++++++++-
devauto/rps/hw/np0801dx.py | 164 +++++++++++++++++++++++++++++++++++++++++++++
devauto/rps/hw/rpsgen.py | 8 +--
3 files changed, 221 insertions(+), 7 deletions(-)
create mode 100644 devauto/rps/hw/np0801dx.py
diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
index b1a456d..b7e64ff 100644
--- a/devauto/rps/hw/np0801dt.py
+++ b/devauto/rps/hw/np0801dt.py
@@ -6,8 +6,58 @@ At the time of the implementation, the reference website is:
http://www.synaccess-net.com/np-0801dt/
"""
from rpsgen import RPSGen
+from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
+from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot, NpCmdRebootSlot
+from np0801dx import NpSlotNumber, NpSlotStatus
+
class np0801dt(RPSGen):
- outlet_count=8
- def __init__(self):
- super(outlet_count)
+
+ def __init__( self ):
+ super( np0801dt , self ).__init__( NpSlotNumber.EIGTH.value )
+ self.__applier = NpCmdApplier( Np0801Dx("192.168.1.100") )
+
+ def turn_all_on( self ):
+ self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.ON ) )
+ results = self.__applier.update()
+ return results.pop()
+
+ def turn_all_off( self ):
+ self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.OFF ) )
+ results = self.__applier.update()
+ return results.pop()
+
+ def turn_on_by_id( self, outlet_id ):
+ if outlet_id > 0 and outlet_id <= self.outlet_count:
+ return self.__ac_slot( self.__glue_enum_item( NpSlotNumber, outlet_id ), NpSlotStatus.ON )
+ return False
+
+ def turn_off_by_id( self, outlet_id ):
+ if outlet_id > 0 and outlet_id <= self.outlet_count:
+ return self.__ac_slot( self.__glue_enum_item( NpSlotNumber, outlet_id ), NpSlotStatus.OFF )
+ return False
+
+ def reboot_by_id( self, outlet_id ):
+ if outlet_id > 0 and outlet_id <= self.outlet_count:
+ slot = self.__glue_enum_item( NpSlotNumber, outlet_id )
+ self.__applier.add( NpCmdRebootSlot( slot ) )
+ results = self.__applier.update()
+ return results.pop()
+ return False
+
+ def monitor( self ):
+ self.__applier.add( NpCmdMonitor() )
+ results = self.__applier.update()
+ return results.pop()
+
+ def __ac_slot( self, slot_number, status ):
+ self.__applier.add( NpCmdAlterSlot( slot_number , status ) )
+ results = self.__applier.update()
+ return results.pop()
+
+ def __glue_enum_item( self, enum_fam , number ):
+ relement = None
+ for name, member in enum_fam.__members__.items():
+ if member.value == number:
+ relement = member
+ return relement
diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
new file mode 100644
index 0000000..9b9bb06
--- /dev/null
+++ b/devauto/rps/hw/np0801dx.py
@@ -0,0 +1,164 @@
+"""
+np0801d(t or b) Remote power switch control implementation
+
+At the time of the implementation, the reference website is:
+
+http://www.synaccess-net.com/np-0801dt/
+"""
+
+from enum import IntEnum, unique
+
+ at unique
+class NpSlotNumber( IntEnum ):
+ ONE = 1
+ TWO = 2
+ THREE = 3
+ FOUR = 4
+ FIVE = 5
+ SIX = 6
+ SEVEN = 7
+ EIGTH = 8
+
+ at unique
+class NpSlotStatus( IntEnum ):
+ OFF = 0
+ ON = 1
+
+
+class Np0801Dx():
+
+ def __init__( self , hostname ):
+ self.__host = hostname
+
+ def do( self , data ):
+ import urllib.request
+ import base64
+ from array import array
+
+ username = 'admin'
+ password = 'admin'
+ auth_str = ( '%s:%s' % (username, password) )
+
+ answer = "http://{0}/cmd.cgi?{1}".format(self.__host , urllib.request.pathname2url(data))
+ basic64 = b"Basic " + base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'')
+ req = urllib.request.Request( answer )
+ req.add_header( "Authorization", basic64 )
+ with urllib.request.urlopen(req) as response:
+ return response.read().replace(b'\r\n', b'')
+
+class NpCmdApplier:
+
+ def __init__( self , pms ):
+ self.commands = []
+
+ if not isinstance( pms , Np0801Dx ):
+ raise ValueError('Not valid Np0801Dx instance')
+
+ self.np_model = pms
+
+ def add( self, command ):
+ self.commands.append( command )
+
+ def update(self):
+ replies = []
+ iter_obj = iter( self.commands )
+ while True:
+ try:
+ element = next( iter_obj )
+ replies.append( element.go( self.np_model ) )
+ except StopIteration:
+ self.commands = []
+ break
+ return replies
+
+class NpMasterCmd:
+
+ def __init__( self , ic, tc ):
+ self.inst_code = ic
+ self.breathe_time = tc
+ self.cmd_arg_1 = None
+ self.cmd_arg_2 = None
+
+ def go( self , pms ):
+ import urllib.parse
+ import time
+ url_params = [ self.inst_code ]
+ args = [ d for d in ( self.cmd_arg_1 , self.cmd_arg_2 ) if d or d == 0 ]
+ for a in ( args ):
+ arg_str = a if isinstance(a, str) else str(a)
+ if arg_str:
+ url_params.append( " " )
+ url_params.append( arg_str )
+ data = urllib.parse.unquote(''.join( url_params ) )
+ reply = pms.do( data )
+ time.sleep( self.breathe_time )
+ return self.parse_reply( reply )
+
+ def parse_reply( self , reply ):
+ pass
+
+
+class NpCmdAlterSlot( NpMasterCmd ):
+
+ def __init__( self, socket , st ):
+
+ super( NpCmdAlterSlot , self ).__init__( '$A3' , 3 )
+
+ if not isinstance( socket , NpSlotNumber ):
+ raise ValueError('Not valid socket selected')
+
+ if not isinstance( st , NpSlotStatus ):
+ raise ValueError('Not valid state selected')
+
+ self.cmd_arg_1 = socket.value
+ self.cmd_arg_2 = st.value
+
+ def parse_reply( self , reply ):
+ return True if reply == b'$A0' else False
+
+class NpCmdRebootSlot( NpMasterCmd ):
+
+ def __init__( self, socket ):
+
+ super( NpCmdRebootSlot , self ).__init__( '$A4', 4 )
+
+ if not isinstance( socket , NpSlotNumber ):
+ raise ValueError('Not valid socket selected')
+
+ self.cmd_arg_1 = socket.value
+
+ def parse_reply( self , reply ):
+ return True if reply == b'$A0' else False
+
+
+class NpCmdMonitor( NpMasterCmd ):
+
+ def __init__( self ):
+
+ super( NpCmdMonitor , self ).__init__( '$A5', 2 )
+
+ def parse_reply( self , reply ):
+ rv = None
+ vals = (reply).decode('utf-8').split(',')
+
+ if vals[0] == '$A0':
+ if len(vals) == 4:
+ rv = {}
+ rv['amperes'] = vals[2]
+ rv['temperature'] = vals[3]
+ rv['slot_status'] = vals[1]
+ return rv
+
+class NpCmdAlterAllSlots( NpMasterCmd ):
+
+ def __init__( self, st ):
+
+ super( NpCmdAlterAllSlots , self ).__init__('$A7', 7 )
+
+ if not isinstance( st , NpSlotStatus ):
+ raise ValueError('Not valid state selected')
+
+ self.cmd_arg_1 = st.value
+
+ def parse_reply( self , reply ):
+ return True if reply == b'$A0' else False
diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
index dea0119..e91d832 100644
--- a/devauto/rps/hw/rpsgen.py
+++ b/devauto/rps/hw/rpsgen.py
@@ -13,11 +13,11 @@ class RPSGen:
def __init__(self, outlet_count):
self.outlet_count = outlet_count
- def turn_all_on():
+ def turn_all_on(self):
pass
- def turn_all_off():
+ def turn_all_off(self):
pass
- def turn_on_by_id(outlet_id):
+ def turn_on_by_id(self,outlet_id):
pass
- def turn_off_by_id(outlet_id):
+ def turn_off_by_id(self,outlet_id):
pass
--
1.9.1
More information about the automated-testing
mailing list