173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
|
|
"""
|
|
Created on Tue May 5 15:37:56 2020
|
|
|
|
@author: nits
|
|
connection:
|
|
green and white = B
|
|
yellow and brown = A
|
|
|
|
"""
|
|
|
|
|
|
import windsensor
|
|
from jsonwind import json_wma,json_debug,json_concat
|
|
from json import loads
|
|
import sighand
|
|
import mqtt_comm
|
|
import time
|
|
import avg_values
|
|
import configfile
|
|
import db_handle
|
|
from threading import Timer
|
|
from os import listdir
|
|
from os.path import isfile, join
|
|
from os import remove
|
|
import threading
|
|
from threadf import Publisher
|
|
import threadf
|
|
|
|
threadf.store_count=0
|
|
|
|
def main():
|
|
#extra threads and parse file
|
|
conf = configfile.Configuration()
|
|
|
|
#initialization of variables and objects
|
|
|
|
|
|
signal_handler=sighand.SignalHandler()
|
|
wind = windsensor.WindSensor()
|
|
speed_mean= avg_values.AverageValue()
|
|
|
|
# topic = conf.topic
|
|
days=conf.days_archive
|
|
topic_debug = conf.topic_debug
|
|
debug_mode = conf.debug_mode
|
|
qos_mqtt=conf.qos
|
|
start = time.time()
|
|
numberOfHosts=conf.numberOfHosts
|
|
numberOfPublishers=conf.numberOfPublishers
|
|
|
|
#check the hosts and publisher objects
|
|
|
|
print("Nr of hosts: ", numberOfHosts)
|
|
hosts = []
|
|
print("Nr of publishers: ", numberOfPublishers)
|
|
publishers = []
|
|
|
|
for i in range(1, numberOfHosts+1):
|
|
name = "Host"+str(i)
|
|
port = conf.config[name].getint('port')
|
|
ident = conf.config[name].get('client_name')
|
|
connect = conf.config[name].getboolean('connect')
|
|
host_name = conf.config[name].get('name')
|
|
|
|
host = mqtt_comm.MQTT_Client(host_name, ident, port,connect)
|
|
hosts.append(host)
|
|
|
|
for host in hosts:
|
|
if(host.connect):
|
|
host.client_connect()
|
|
host.client.loop_start()
|
|
#create the publisher objects and link them to the hosts
|
|
for i in range(1, numberOfPublishers+1):
|
|
name = "Publisher"+str(i)
|
|
|
|
pubflag=conf.config[name].getboolean('pubflag')
|
|
arcflag=conf.config[name].getboolean('arc_flag')
|
|
daemonTh=conf.config[name].getboolean('daemonTh')
|
|
st_flag=conf.config[name].getboolean('store_flag')
|
|
topic = conf.config[name].get('topic')
|
|
host_n=conf.config[name].getint('host_n')
|
|
thName=conf.config[name].get('threadName')
|
|
source=conf.config[name].get('source')
|
|
order=conf.config[name].get('order')
|
|
n=conf.config[name].getint('n')
|
|
print("publisher",i,"daemon =",daemonTh,"archive=",arcflag,"publish=",pubflag)
|
|
Publisher_n = threadf.Publisher(pubflag,arcflag,topic,hosts[host_n-1],qos_mqtt,daemonTh,thName,source,st_flag,order,n)
|
|
publishers.append(Publisher_n)
|
|
|
|
|
|
|
|
#connect to SQL
|
|
while True:
|
|
try:
|
|
sqinit=db_handle.Sqlite3_DB();
|
|
break;
|
|
except db_handle.sqlite3.Error as er:
|
|
print('SQLite error: %s' % (' '.join(er.args)))
|
|
print("Exception class is: ", er.__class__)
|
|
time.sleep(3)
|
|
threadf.store_count=sqinit.sql_count();
|
|
print("Archive has INITIALLY ",threadf.store_count," rows")
|
|
|
|
time_json=""
|
|
while (True):
|
|
# if(threadf.quit==False):
|
|
if(time.time() - start > 1.0 or ((time_json)is "")): #Wait 1 second to run loop, if sensor comm fails, will try faster so data won't be lost
|
|
if(threadf.lock.locked()):
|
|
threadf.lock.release()#release lock to run the main thread
|
|
#print("Lock released for running the main routines")
|
|
threadf.lock.acquire()
|
|
start = time.time()
|
|
|
|
#load data from sensors and calculate mean
|
|
|
|
time_act = wind.readDateTime()
|
|
windspeed_act = wind.readWindSpeed()
|
|
wind_direct= wind.readWindDirection()
|
|
wind_temp=wind.readTemp()
|
|
|
|
speed_mean.insert_value_list2(windspeed_act)
|
|
speed_mean.insert_value_list10(windspeed_act)
|
|
speed_m2=speed_mean.get_m2()
|
|
speed_max2=speed_mean.get_max2()
|
|
speed_m10=speed_mean.get_m10()
|
|
speed_max10=speed_mean.get_max10()
|
|
|
|
rm_number=1800 #number of removed lines when requested, 1800 = half an hour
|
|
if(threadf.store_count>(days*86400+rm_number)):#number of samples per day(size in lines)
|
|
startc = time.time()
|
|
|
|
print('Too many files on the archive, removing older than',days,'days size')
|
|
sqinit.remove_oldests_from_db(rm_number);#clean DB if DB is too large
|
|
threadf.store_count=threadf.store_count-rm_number;
|
|
timec=time.time()-startc
|
|
print("time taken to remove",rm_number," lines from the old data was: ",timec)#computes and displays the time taken
|
|
|
|
|
|
|
|
json_pack = json_wma(time_act,windspeed_act,wind_direct,wind_temp,speed_m2,speed_max2,speed_m10,speed_max10)#generate json pack with values
|
|
time_json=loads(json_pack)["time"];
|
|
if (time_json is not ""):
|
|
publishers[0].write_line_db_sub(json_pack)#write line to db using the first publisher object
|
|
|
|
for pub in publishers:
|
|
pub.publish(json_concat(json_pack,pub.topic))
|
|
#write line to db
|
|
|
|
|
|
|
|
|
|
#debug mode
|
|
if (debug_mode ==1):
|
|
errorcounter = wind.errorcounter
|
|
kompasswinkel_act = wind.readCompassAngle()
|
|
json_pack_debug = json_debug(time_act,windspeed_act,wind_direct,errorcounter,kompasswinkel_act) #print information on console
|
|
print(json_pack_debug)
|
|
|
|
if(threadf.lock.locked()):#release thread lock
|
|
threadf.lock.release()
|
|
#check if sensor was read, in case not, it will skip the sleep in order to get the reading faster
|
|
|
|
# print("Cycle duration:",time.time()-start,"s")# print time
|
|
try:
|
|
if(time_json is not ""):
|
|
time.sleep(1-(time.time()-start)) #treats exception if time bigger than 1.0s
|
|
except:
|
|
print("Cycle>1s lasted:",time.time()-start,"seconds")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |