300 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import requests
import time
import sys
from optparse import OptionParser
import re
import nicelogger
log = nicelogger.log
__VERSION__ = "1.0"
class SolariumWeb(object):
def __init__(self, ip):
self.ip = ip
self.password = 1111
self.status_regex = r'^(.+?);([-+]?\d+);([-+]?\d+);([-+]?\d+)';
self.getparam_regex = r'^(.+?);([-+]?\d+);(.+?)$';
self.last_status = {'state': 'idle', 'timetoend': 0, 'inputtime': -1, 'inputmoney': -1}
def setPassword(self, password):
self.password = password
def lastStatus(self):
return self.last_status
def reqStatus(self):
r = requests.get(self.ip + '/status')
#log(r.headers['content-type'])
#log(r.encoding)
#log(r.status_code)
#log(r.text)
if r.status_code == 200:
s2 = r.text.split('\r\n')[1]
#log(s2)
vals = re.findall(self.status_regex, s2)
#log(vals)
self.last_status['state'] = vals[0][0]
self.last_status['timetoend'] = int(vals[0][1])
self.last_status['inputtime'] = int(vals[0][2])
self.last_status['inputmoney'] = int(vals[0][3])
return r.status_code
def postStart(self, id = 0):
payload = 'Command;Password;User_ID\r\nstart;%04d;%d\r\n' % (self.password, id)
r = requests.post(self.ip + '/command', data=payload)
#log(r.status_code)
#log(r.text)
if r.status_code == 200:
s2 = r.text.split('\r\n')[1]
vals = re.findall(self.status_regex, s2)
self.last_status['state'] = vals[0][0]
self.last_status['timetoend'] = int(vals[0][1])
self.last_status['inputtime'] = int(vals[0][2])
self.last_status['inputmoney'] = int(vals[0][3])
return r.status_code
def getParamValue(self, param = ' ', ind = 0):
payload = 'Command;Password;Param;Index\r\nget_param;%04d;%s;%d\r\n' % (self.password, param, ind)
r = requests.post(self.ip + '/command', data=payload)
if r.status_code == 200:
txt = r.text
s1 = txt.split('\r\n')[0]
if (s1 != 'Param;Index;Value'):
log('Invalid responce format: %s' % s1)
s2 = txt.split('\r\n')[1]
vals = re.findall(self.getparam_regex, s2)
if (vals[0][0] != param):
log('Invalid param name %s' % vals[0][0])
return None
if (int(vals[0][1]) != ind):
log('Invalid param index %s' % vals[0][1])
return None
return vals[0][2].encode('latin1').decode('cp1251')
return None
def setParamValue(self, param = ' ', ind = 0, val = 0):
payload = 'Command;Password;Param;Index;Value\r\nset_param;%04d;%s;%d;%s\r\n' % (self.password, param, ind, val)
r = requests.post(self.ip + '/command', data=payload)
if r.status_code == 200:
txt = r.text
s1 = txt.split('\r\n')[0]
if (s1 != 'Param;Index;Value'):
log('Invalid responce format: %s' % s1)
s2 = txt.split('\r\n')[1]
vals = re.findall(self.getparam_regex, s2)
if (vals[0][0] != param):
log('Invalid param name %s' % vals[0][0])
return None
if (int(vals[0][1]) != ind):
log('Invalid param index %s' % vals[0][1])
return None
return vals[0][2].encode('latin1').decode('cp1251')
return None
def postCansel(self, id = 0, str1 = ' ', str2 = ''):
payload = 'Command;Password;User_ID;Line1;Line2\r\ncansel;%04d;%d;%s;%s\r\n' % (self.password, id, str1.encode('cp1251'), str2.encode('cp1251'))
r = requests.post(self.ip + '/command', data=payload)
#log(r.status_code)
#log(r.text)
if r.status_code == 200:
s2 = r.text.split('\r\n')[1]
vals = re.findall(self.status_regex, s2)
self.last_status['state'] = vals[0][0]
self.last_status['timetoend'] = int(vals[0][1])
self.last_status['inputtime'] = int(vals[0][2])
self.last_status['inputmoney'] = int(vals[0][3])
return r.status_code
def postAbort(self):
payload = 'Command;Password\r\nabort;%04d\r\n' % (self.password)
r = requests.post(self.ip + '/command', data=payload)
#log(r.status_code)
#log(r.text)
if r.status_code == 200:
s2 = r.text.split('\r\n')[1]
vals = re.findall(self.status_regex, s2)
self.last_status['state'] = vals[0][0]
self.last_status['timetoend'] = int(vals[0][1])
self.last_status['inputtime'] = int(vals[0][2])
self.last_status['inputmoney'] = int(vals[0][3])
return r.status_code
def postUserIdCommand(self, id = 0, paytime = -1, money = -1):
payload = 'Command;Password;User_ID;BalanceTime;BalanceMoney\r\nuser_req_time;%04d;%d;%d;%d\r\n' % (self.password, id, paytime, money)
r = requests.post(self.ip + '/command', data=payload)
log(r.status_code)
log(r.text)
if r.status_code == 200:
s2 = r.text.split('\r\n')[1]
vals = re.findall(self.status_regex, s2)
self.last_status['state'] = vals[0][0]
self.last_status['timetoend'] = int(vals[0][1])
self.last_status['inputtime'] = int(vals[0][2])
self.last_status['inputmoney'] = int(vals[0][3])
return r.status_code
def main():
parser = OptionParser(version="Solarium test tool ver. %s" % __VERSION__)
parser.add_option("-a", "--ip-addr", dest="ip",
default='http://192.168.0.250',
help="solarium ip address")
parser.add_option("-i", "--client-id", dest="client_id",
default='0',
type='int',
help="client id number")
parser.add_option("-t", "--client-time", dest="client_time",
default='-1',
type='int',
help="client max accessible time")
parser.add_option("-m", "--client-money", dest="client_money",
default='-1',
type='int',
help="client max accessible money")
parser.add_option("-r", "--reset", dest="reset",
default=False,
action="store_true",
help="make controller reset before work")
try:
(options, args) = parser.parse_args()
except:
sys.exit(-1)
if not options.ip:
sys.exit(-1)
s = SolariumWeb(options.ip)
log('Start work.')
if options.reset:
log('Reset solarium state.')
s.postAbort()
#t = time.time()
#while (time.time() - t < 1.0):
s.reqStatus()
#log(s.lastStatus())
if s.lastStatus()['state'] != 'idle':
log('Device is busy. Exit.')
log(s.lastStatus())
sys.exit(-2)
#log(s.setParamValue('ZummerEnableDesc', 0, 1))
#log(s.setParamValue('SolariumMinTimeDesc', 0, 5))
#log(s.setParamValue('SolariumMaxTimeDesc', 0, 15))
log('ZummerEnableDesc = ' + s.getParamValue('ZummerEnableDesc', 0))
log('SolariumMinTimeDesc = ' + s.getParamValue('SolariumMinTimeDesc', 0))
log('SolariumMaxTimeDesc = ' + s.getParamValue('SolariumMaxTimeDesc', 0))
log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 0))
log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 1))
log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 2))
log('JournalErrorNumberDescCode = ' + s.getParamValue('JournalErrorNumberDescCode', 3))
#sys.exit(0)
if options.client_time > 0 or options.client_money > 0:
log('Request client input time.')
if s.postUserIdCommand(options.client_id, options.client_time, options.client_money) != 200:
log('Error post command.')
log(s.lastStatus())
sys.exit(-3)
else:
log('Well done.')
log(s.lastStatus())
sys.exit(0)
time.sleep(1.0)
s.reqStatus()
if s.lastStatus()['state'] != 'waiting_input':
log('Error state.')
log(s.lastStatus())
sys.exit(-4)
t = time.time()
while (time.time() - t < 120.0):
time.sleep(1.0)
s.reqStatus()
st = s.lastStatus()
if st['state'] == 'waiting_input':
if st['timetoend'] > 0:
log('Waiting input... Timeout %d sec.' % st['timetoend'])
continue
else:
log('Quit by timeout. No user input.')
log(s.lastStatus())
sys.exit(-5)
elif st['state'] == 'waiting_ack':
log('User input finished.')
break
else:
log('Error state.')
log(s.lastStatus())
sys.exit(-6)
s.reqStatus()
st = s.lastStatus()
#log(s.lastStatus())
log('User requests %d seconds.' % st['inputtime'])
#if st['inputtime'] > options.client_time:
# log('Requested time longer then possible.')
# log(s.lastStatus())
# sys.exit(-7)
if st['inputtime'] == 0:
log('Requested time is 0.')
log(s.lastStatus())
sys.exit(-8)
s.postStart()
#log(s.lastStatus())
#s.postCansel(0, 'ОТМЕНА'.decode('utf8'), 'ОПЕРАЦИИ'.decode('utf8'))
solarium_was_busy = 0
t = time.time()
while (time.time() - t < options.client_time + 1.0):
time.sleep(1.0)
s.reqStatus()
st = s.lastStatus()
#log(s.lastStatus())
if st['state'] == 'waiting_ack':
log('Controller is waiting for ack %d sec.' % st['timetoend'])
elif st['state'] == 'pre_pause':
solarium_was_busy = 1
log('Pause before on %d sec.' % st['timetoend'])
elif st['state'] == 'sunburn':
solarium_was_busy = 1
log('Sunburn active %d sec.' % st['timetoend'])
elif st['state'] == 'ventilation':
solarium_was_busy = 1
log('Ventilation %d sec.' % st['timetoend'])
else:
if st['state'] == 'idle':
if not solarium_was_busy:
log('Error. Invalid state.')
log(s.lastStatus())
sys.exit(-9)
else:
break
else:
log('Error. Invalid state.')
sys.exit(-9)
log('Well done.')
sys.exit(0)
if __name__ == '__main__':
main()