# -*- coding: utf-8 -*- import requests import time import sys from optparse import OptionParser import re import nicelogger log = nicelogger.log __VERSION__ = "1.0" class SolariumWeb(object): def __init__(self, ip): self.ip = ip self.password = 1111 self.status_regex = r'^(.+?);([-+]?\d+);([-+]?\d+);([-+]?\d+)'; self.getparam_regex = r'^(.+?);([-+]?\d+);(.+?)$'; self.last_status = {'state': 'idle', 'timetoend': 0, 'inputtime': -1, 'inputmoney': -1} def setPassword(self, password): self.password = password def lastStatus(self): return self.last_status def reqStatus(self): r = requests.get(self.ip + '/status') #log(r.headers['content-type']) #log(r.encoding) #log(r.status_code) #log(r.text) if r.status_code == 200: s2 = r.text.split('\r\n')[1] #log(s2) vals = re.findall(self.status_regex, s2) #log(vals) self.last_status['state'] = vals[0][0] self.last_status['timetoend'] = int(vals[0][1]) self.last_status['inputtime'] = int(vals[0][2]) self.last_status['inputmoney'] = int(vals[0][3]) return r.status_code def postStart(self, id = 0): payload = 'Command;Password;User_ID\r\nstart;%04d;%d\r\n' % (self.password, id) r = requests.post(self.ip + '/command', data=payload) #log(r.status_code) #log(r.text) if r.status_code == 200: s2 = r.text.split('\r\n')[1] vals = re.findall(self.status_regex, s2) self.last_status['state'] = vals[0][0] self.last_status['timetoend'] = int(vals[0][1]) self.last_status['inputtime'] = int(vals[0][2]) self.last_status['inputmoney'] = int(vals[0][3]) return r.status_code def getParamValue(self, param = ' ', ind = 0): payload = 'Command;Password;Param;Index\r\nget_param;%04d;%s;%d\r\n' % (self.password, param, ind) r = requests.post(self.ip + '/command', data=payload) if r.status_code == 200: txt = r.text s1 = txt.split('\r\n')[0] if (s1 != 'Param;Index;Value'): log('Invalid responce format: %s' % s1) s2 = txt.split('\r\n')[1] vals = re.findall(self.getparam_regex, s2) if (vals[0][0] != param): log('Invalid param name %s' % vals[0][0]) return None if (int(vals[0][1]) != ind): log('Invalid param index %s' % vals[0][1]) return None return vals[0][2].encode('latin1').decode('cp1251') return None def setParamValue(self, param = ' ', ind = 0, val = 0): payload = 'Command;Password;Param;Index;Value\r\nset_param;%04d;%s;%d;%s\r\n' % (self.password, param, ind, val) r = requests.post(self.ip + '/command', data=payload) if r.status_code == 200: txt = r.text s1 = txt.split('\r\n')[0] if (s1 != 'Param;Index;Value'): log('Invalid responce format: %s' % s1) s2 = txt.split('\r\n')[1] vals = re.findall(self.getparam_regex, s2) if (vals[0][0] != param): log('Invalid param name %s' % vals[0][0]) return None if (int(vals[0][1]) != ind): log('Invalid param index %s' % vals[0][1]) return None return vals[0][2].encode('latin1').decode('cp1251') return None def postCansel(self, id = 0, str1 = ' ', str2 = ''): payload = 'Command;Password;User_ID;Line1;Line2\r\ncansel;%04d;%d;%s;%s\r\n' % (self.password, id, str1.encode('cp1251'), str2.encode('cp1251')) r = requests.post(self.ip + '/command', data=payload) #log(r.status_code) #log(r.text) if r.status_code == 200: s2 = r.text.split('\r\n')[1] vals = re.findall(self.status_regex, s2) self.last_status['state'] = vals[0][0] self.last_status['timetoend'] = int(vals[0][1]) self.last_status['inputtime'] = int(vals[0][2]) self.last_status['inputmoney'] = int(vals[0][3]) return r.status_code def postAbort(self): payload = 'Command;Password\r\nabort;%04d\r\n' % (self.password) r = requests.post(self.ip + '/command', data=payload) #log(r.status_code) #log(r.text) if r.status_code == 200: s2 = r.text.split('\r\n')[1] vals = re.findall(self.status_regex, s2) self.last_status['state'] = vals[0][0] self.last_status['timetoend'] = int(vals[0][1]) self.last_status['inputtime'] = int(vals[0][2]) self.last_status['inputmoney'] = int(vals[0][3]) return r.status_code def postUserIdCommand(self, id = 0, paytime = -1, money = -1): payload = 'Command;Password;User_ID;BalanceTime;BalanceMoney\r\nuser_req_time;%04d;%d;%d;%d\r\n' % (self.password, id, paytime, money) r = requests.post(self.ip + '/command', data=payload) log(r.status_code) log(r.text) if r.status_code == 200: s2 = r.text.split('\r\n')[1] vals = re.findall(self.status_regex, s2) self.last_status['state'] = vals[0][0] self.last_status['timetoend'] = int(vals[0][1]) self.last_status['inputtime'] = int(vals[0][2]) self.last_status['inputmoney'] = int(vals[0][3]) return r.status_code def main(): parser = OptionParser(version="Solarium test tool ver. %s" % __VERSION__) parser.add_option("-a", "--ip-addr", dest="ip", default='http://192.168.0.250', help="solarium ip address") parser.add_option("-i", "--client-id", dest="client_id", default='0', type='int', help="client id number") parser.add_option("-t", "--client-time", dest="client_time", default='-1', type='int', help="client max accessible time") parser.add_option("-m", "--client-money", dest="client_money", default='-1', type='int', help="client max accessible money") parser.add_option("-r", "--reset", dest="reset", default=False, action="store_true", help="make controller reset before work") try: (options, args) = parser.parse_args() except: sys.exit(-1) if not options.ip: sys.exit(-1) s = SolariumWeb(options.ip) log('Start work.') if options.reset: log('Reset solarium state.') s.postAbort() #t = time.time() #while (time.time() - t < 1.0): s.reqStatus() #log(s.lastStatus()) if s.lastStatus()['state'] != 'idle': log('Device is busy. Exit.') log(s.lastStatus()) sys.exit(-2) #log(s.setParamValue('ZummerEnableDesc', 0, 1)) #log(s.setParamValue('SolariumMinTimeDesc', 0, 5)) #log(s.setParamValue('SolariumMaxTimeDesc', 0, 15)) log('ZummerEnableDesc = ' + s.getParamValue('ZummerEnableDesc', 0)) log('SolariumMinTimeDesc = ' + s.getParamValue('SolariumMinTimeDesc', 0)) log('SolariumMaxTimeDesc = ' + s.getParamValue('SolariumMaxTimeDesc', 0)) log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 0)) log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 1)) log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 2)) log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 3)) #sys.exit(0) if options.client_time > 0 or options.client_money > 0: log('Request client input time.') if s.postUserIdCommand(options.client_id, options.client_time, options.client_money) != 200: log('Error post command.') log(s.lastStatus()) sys.exit(-3) else: log('Well done.') log(s.lastStatus()) sys.exit(0) time.sleep(1.0) s.reqStatus() if s.lastStatus()['state'] != 'waiting_input': log('Error state.') log(s.lastStatus()) sys.exit(-4) t = time.time() while (time.time() - t < 120.0): time.sleep(1.0) s.reqStatus() st = s.lastStatus() if st['state'] == 'waiting_input': if st['timetoend'] > 0: log('Waiting input... Timeout %d sec.' % st['timetoend']) continue else: log('Quit by timeout. No user input.') log(s.lastStatus()) sys.exit(-5) elif st['state'] == 'waiting_ack': log('User input finished.') break else: log('Error state.') log(s.lastStatus()) sys.exit(-6) s.reqStatus() st = s.lastStatus() #log(s.lastStatus()) log('User requests %d seconds.' % st['inputtime']) #if st['inputtime'] > options.client_time: # log('Requested time longer then possible.') # log(s.lastStatus()) # sys.exit(-7) if st['inputtime'] == 0: log('Requested time is 0.') log(s.lastStatus()) sys.exit(-8) s.postStart() #log(s.lastStatus()) #s.postCansel(0, 'ОТМЕНА'.decode('utf8'), 'ОПЕРАЦИИ'.decode('utf8')) solarium_was_busy = 0 t = time.time() while (time.time() - t < options.client_time + 1.0): time.sleep(1.0) s.reqStatus() st = s.lastStatus() #log(s.lastStatus()) if st['state'] == 'waiting_ack': log('Controller is waiting for ack %d sec.' % st['timetoend']) elif st['state'] == 'pre_pause': solarium_was_busy = 1 log('Pause before on %d sec.' % st['timetoend']) elif st['state'] == 'sunburn': solarium_was_busy = 1 log('Sunburn active %d sec.' % st['timetoend']) elif st['state'] == 'ventilation': solarium_was_busy = 1 log('Ventilation %d sec.' % st['timetoend']) else: if st['state'] == 'idle': if not solarium_was_busy: log('Error. Invalid state.') log(s.lastStatus()) sys.exit(-9) else: break else: log('Error. Invalid state.') sys.exit(-9) log('Well done.') sys.exit(0) if __name__ == '__main__': main()