[Automated-testing] [PATCH] devauto: Method Implementation for np0801dt

Edwin Plauchu edwin.plauchu.camacho at linux.intel.com
Wed Jun 22 15:30:09 PDT 2016


From: Edwin Plauchu <edwin.plauchu.camacho at intel.com>

The remote power management system NP-0801D(T) has been implemented by utilizing np0801dx.py which allows
take over series T or B.

Signed-off-by: Edwin Plauchu <edwin.plauchu.camacho at intel.com>
---
 devauto/rps/hw/np0801dt.py |  56 +++++++++++++++-
 devauto/rps/hw/np0801dx.py | 164 +++++++++++++++++++++++++++++++++++++++++++++
 devauto/rps/hw/rpsgen.py   |   8 +--
 3 files changed, 221 insertions(+), 7 deletions(-)
 create mode 100644 devauto/rps/hw/np0801dx.py

diff --git a/devauto/rps/hw/np0801dt.py b/devauto/rps/hw/np0801dt.py
index b1a456d..b7e64ff 100644
--- a/devauto/rps/hw/np0801dt.py
+++ b/devauto/rps/hw/np0801dt.py
@@ -6,8 +6,58 @@ At the time of the implementation, the reference website is:
 http://www.synaccess-net.com/np-0801dt/
 """
 from rpsgen import RPSGen
+from np0801dx import Np0801Dx, NpCmdApplier, NpCmdMonitor
+from np0801dx import NpCmdAlterAllSlots, NpCmdAlterSlot, NpCmdRebootSlot
+from np0801dx import NpSlotNumber, NpSlotStatus
+
 
 class np0801dt(RPSGen):
-    outlet_count=8
-    def __init__(self):
-        super(outlet_count)
+
+    def __init__( self ):
+        super( np0801dt , self ).__init__( NpSlotNumber.EIGTH.value )
+        self.__applier = NpCmdApplier( Np0801Dx("192.168.1.100") )
+
+    def turn_all_on( self ):
+        self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.ON ) )
+        results = self.__applier.update()
+        return results.pop()
+
+    def turn_all_off( self ):
+        self.__applier.add( NpCmdAlterAllSlots( NpSlotStatus.OFF ) )
+        results = self.__applier.update()
+        return results.pop()
+
+    def turn_on_by_id( self, outlet_id ):
+        if outlet_id > 0 and outlet_id <= self.outlet_count:
+            return self.__ac_slot( self.__glue_enum_item( NpSlotNumber, outlet_id ), NpSlotStatus.ON )
+        return False
+
+    def turn_off_by_id( self, outlet_id ):
+        if outlet_id > 0 and outlet_id <= self.outlet_count:
+            return self.__ac_slot( self.__glue_enum_item( NpSlotNumber, outlet_id ), NpSlotStatus.OFF )
+        return False
+
+    def reboot_by_id( self, outlet_id ):
+        if outlet_id > 0 and outlet_id <= self.outlet_count:
+            slot = self.__glue_enum_item( NpSlotNumber, outlet_id )
+            self.__applier.add( NpCmdRebootSlot( slot ) )
+            results = self.__applier.update()
+            return results.pop()
+        return False
+
+    def monitor( self ):
+        self.__applier.add( NpCmdMonitor() )
+        results = self.__applier.update()
+        return results.pop()
+
+    def __ac_slot( self, slot_number, status ):
+        self.__applier.add( NpCmdAlterSlot( slot_number , status ) )
+        results = self.__applier.update()
+        return results.pop()
+
+    def __glue_enum_item( self, enum_fam , number ):
+        relement = None
+        for name, member in enum_fam.__members__.items():
+           if member.value == number:
+               relement = member
+        return relement
diff --git a/devauto/rps/hw/np0801dx.py b/devauto/rps/hw/np0801dx.py
new file mode 100644
index 0000000..9b9bb06
--- /dev/null
+++ b/devauto/rps/hw/np0801dx.py
@@ -0,0 +1,164 @@
+"""
+np0801d(t or b) Remote power switch control implementation
+
+At the time of the implementation, the reference website is:
+
+http://www.synaccess-net.com/np-0801dt/
+"""
+
+from enum import IntEnum, unique
+
+ at unique
+class NpSlotNumber( IntEnum ):
+    ONE   = 1
+    TWO   = 2
+    THREE = 3
+    FOUR  = 4
+    FIVE  = 5
+    SIX   = 6
+    SEVEN = 7
+    EIGTH = 8
+
+ at unique
+class NpSlotStatus( IntEnum ):
+    OFF = 0
+    ON  = 1 
+
+
+class Np0801Dx():
+
+    def __init__( self , hostname ):
+        self.__host = hostname
+    
+    def do( self , data ):
+        import urllib.request
+        import base64
+        from array import array
+
+        username = 'admin'
+        password = 'admin'
+        auth_str = ( '%s:%s' % (username, password) )
+
+        answer = "http://{0}/cmd.cgi?{1}".format(self.__host , urllib.request.pathname2url(data))
+        basic64 = b"Basic " + base64.encodestring(auth_str.encode('ascii')).replace(b'\n', b'') 
+        req = urllib.request.Request( answer )
+        req.add_header( "Authorization", basic64 ) 
+        with urllib.request.urlopen(req) as response:
+            return response.read().replace(b'\r\n', b'')
+
+class NpCmdApplier:
+
+    def __init__( self , pms ):
+        self.commands = []
+
+        if not isinstance( pms , Np0801Dx ):
+            raise ValueError('Not valid Np0801Dx instance')
+
+        self.np_model = pms
+
+    def add( self, command ):
+        self.commands.append( command )
+
+    def update(self):
+       replies = []
+       iter_obj = iter( self.commands )
+       while True:
+          try:
+             element = next( iter_obj )
+             replies.append( element.go( self.np_model ) )
+          except StopIteration:
+             self.commands = []
+             break
+       return replies
+
+class NpMasterCmd:
+
+    def __init__( self , ic, tc ):
+        self.inst_code = ic
+        self.breathe_time = tc
+        self.cmd_arg_1 = None
+        self.cmd_arg_2 = None
+
+    def go( self , pms ):
+        import urllib.parse
+        import time
+        url_params = [ self.inst_code ] 
+        args = [ d for d in ( self.cmd_arg_1 , self.cmd_arg_2 ) if d or d == 0 ]
+        for a in ( args ):
+            arg_str = a if isinstance(a, str) else str(a)
+            if arg_str:
+            	url_params.append( " " )
+            	url_params.append( arg_str )
+        data = urllib.parse.unquote(''.join( url_params ) )
+        reply = pms.do( data )        
+        time.sleep( self.breathe_time )
+        return self.parse_reply( reply )
+
+    def parse_reply( self , reply ):
+        pass
+
+
+class NpCmdAlterSlot( NpMasterCmd ):
+
+    def __init__( self, socket , st ):
+
+        super( NpCmdAlterSlot , self ).__init__( '$A3' , 3 )
+
+        if not isinstance( socket , NpSlotNumber ):
+            raise ValueError('Not valid socket selected')
+
+        if not isinstance( st , NpSlotStatus ):
+            raise ValueError('Not valid state selected')
+
+        self.cmd_arg_1 = socket.value 
+        self.cmd_arg_2 = st.value
+
+    def parse_reply( self , reply ):
+        return True if reply == b'$A0' else False
+
+class NpCmdRebootSlot( NpMasterCmd ):
+
+    def __init__( self, socket ):
+
+        super( NpCmdRebootSlot , self ).__init__( '$A4', 4 )
+
+        if not isinstance( socket , NpSlotNumber ):
+            raise ValueError('Not valid socket selected')
+
+        self.cmd_arg_1 = socket.value
+
+    def parse_reply( self , reply ):
+        return True if reply == b'$A0' else False 
+
+
+class NpCmdMonitor( NpMasterCmd ):
+
+    def __init__( self ):
+
+        super( NpCmdMonitor , self ).__init__( '$A5', 2 )
+
+    def parse_reply( self , reply ):
+        rv = None
+        vals = (reply).decode('utf-8').split(',')
+
+        if vals[0] == '$A0':
+            if len(vals) == 4:
+                rv = {} 
+                rv['amperes'] = vals[2]
+                rv['temperature'] = vals[3]
+                rv['slot_status'] = vals[1]
+        return rv
+
+class NpCmdAlterAllSlots( NpMasterCmd ):
+
+    def __init__( self, st ):
+
+        super( NpCmdAlterAllSlots , self ).__init__('$A7', 7 )
+
+        if not isinstance( st , NpSlotStatus ):
+            raise ValueError('Not valid state selected')
+
+        self.cmd_arg_1 = st.value
+
+    def parse_reply( self , reply ):
+        return True if reply == b'$A0' else False 
diff --git a/devauto/rps/hw/rpsgen.py b/devauto/rps/hw/rpsgen.py
index dea0119..e91d832 100644
--- a/devauto/rps/hw/rpsgen.py
+++ b/devauto/rps/hw/rpsgen.py
@@ -13,11 +13,11 @@ class RPSGen:
     def __init__(self, outlet_count):
         self.outlet_count = outlet_count
 
-    def turn_all_on():
+    def turn_all_on(self):
         pass
-    def turn_all_off():
+    def turn_all_off(self):
         pass
-    def turn_on_by_id(outlet_id):
+    def turn_on_by_id(self,outlet_id):
         pass
-    def turn_off_by_id(outlet_id):
+    def turn_off_by_id(self,outlet_id):
         pass
-- 
1.9.1



More information about the automated-testing mailing list