153 lines
6.2 KiB
Python
153 lines
6.2 KiB
Python
|
|
from json import loads
|
||
|
|
import db_handle
|
||
|
|
from jsonwind import json_concat
|
||
|
|
import mqtt_comm
|
||
|
|
import time
|
||
|
|
import threading
|
||
|
|
global store_count #global variable, so no need to run sql_count
|
||
|
|
global lock
|
||
|
|
lock = threading.Lock()
|
||
|
|
#thread functions are defined here, as well as global variables
|
||
|
|
class Publisher():
|
||
|
|
def __init__(self,publish_flag,pub_arc_flag,topic,MQTT_client,qos,daemonTh,ThName,source,store_flag,order,n):
|
||
|
|
global lock
|
||
|
|
self.topic=topic;
|
||
|
|
self.qos=qos
|
||
|
|
self.source=source
|
||
|
|
self.client_mqtt = MQTT_client
|
||
|
|
self.publish_flag=publish_flag
|
||
|
|
self.pub_arc_flag=pub_arc_flag
|
||
|
|
self.daemon=daemonTh
|
||
|
|
self.threadName= ThName
|
||
|
|
self.source=source
|
||
|
|
self.store_flag = store_flag
|
||
|
|
self.order =order
|
||
|
|
self.n = n
|
||
|
|
if (daemonTh==True):
|
||
|
|
self.t1=threading.Thread
|
||
|
|
|
||
|
|
|
||
|
|
#function to assure that data has been published
|
||
|
|
def getTopic(self):
|
||
|
|
return self.topic
|
||
|
|
|
||
|
|
def publish_and_wait(self,count,message):
|
||
|
|
if (loads(message)["time"] is not ""):
|
||
|
|
if (self.client_mqtt.connected):
|
||
|
|
(r_code,msg_id)=self.client_mqtt.client.publish(self.topic,message,qos = self.qos)
|
||
|
|
if(r_code==0 and self.source =='archive'):
|
||
|
|
if(msg_id%500==0):
|
||
|
|
print("Message",msg_id, "was delivered with success from the", self.source)
|
||
|
|
return True
|
||
|
|
if(r_code==0 and self.source is not'archive'):
|
||
|
|
print("Message",msg_id, "was delivered with success from the", self.source)
|
||
|
|
return True
|
||
|
|
if(r_code is not 0 and self.store_flag):
|
||
|
|
print("Error no:",r_code,"saving message to archive")
|
||
|
|
self.write_line_db_sub(message)
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
if(self.store_flag):
|
||
|
|
self.write_line_db_sub(message)
|
||
|
|
|
||
|
|
def publish(self,message=None):
|
||
|
|
global store_count
|
||
|
|
if (self.publish_flag ==True and self.pub_arc_flag == False):
|
||
|
|
self.publish_and_wait(store_count,json_concat(message,self.topic))
|
||
|
|
if (self.publish_flag ==True and self.pub_arc_flag==True and self.daemon==True and store_count>2):
|
||
|
|
self.publish_from_archive()
|
||
|
|
if (self.publish_flag ==True and self.pub_arc_flag==True and self.daemon==False):
|
||
|
|
self.check_publish_stored_files()
|
||
|
|
|
||
|
|
|
||
|
|
#function to write one line to the DB
|
||
|
|
def write_line_db_sub(self,msg_pack):
|
||
|
|
global store_count
|
||
|
|
global lock
|
||
|
|
lock.acquire()
|
||
|
|
data = loads(msg_pack);
|
||
|
|
sqlite_db=db_handle.Sqlite3_DB();
|
||
|
|
sqlite_db.cursor.execute("INSERT into %s values (?, ?);"%(sqlite_db.tablename),[data["time"] , msg_pack])
|
||
|
|
sqlite_db.conn.commit();
|
||
|
|
store_count=store_count+1
|
||
|
|
if (store_count >1):
|
||
|
|
print("Message ",store_count," stored");
|
||
|
|
sqlite_db.conn.close()
|
||
|
|
lock.release()
|
||
|
|
return;
|
||
|
|
|
||
|
|
#function to publish from archive lines and delete them after
|
||
|
|
|
||
|
|
def check_publish_stored_files(self):
|
||
|
|
|
||
|
|
if(self.client_mqtt.connected):
|
||
|
|
starter = time.time()#count time to execute the function
|
||
|
|
global store_count
|
||
|
|
sqlite_db=db_handle.Sqlite3_DB()
|
||
|
|
|
||
|
|
if(store_count>0):
|
||
|
|
if (self.order=='oldest'):
|
||
|
|
print("SQL Archive has",store_count,"rows.")
|
||
|
|
if (store_count >self.n):
|
||
|
|
end= self.n;
|
||
|
|
else:
|
||
|
|
end= store_count
|
||
|
|
i=0
|
||
|
|
|
||
|
|
if (self.order=='oldest'): # if the published files are gonna be the oldest (publish from archive)
|
||
|
|
|
||
|
|
for row in sqlite_db.select_oldest(end):
|
||
|
|
lock.acquire()
|
||
|
|
if(self.publish_and_wait(store_count,json_concat(row[0],self.topic))):
|
||
|
|
store_count = store_count-1
|
||
|
|
i=i+1
|
||
|
|
time.sleep(0.002)
|
||
|
|
if (lock.locked()):
|
||
|
|
lock.release()
|
||
|
|
|
||
|
|
sqlite_db.remove_oldests_from_db(i)
|
||
|
|
time_publish=time.time()-starter
|
||
|
|
|
||
|
|
# print("############################################################")
|
||
|
|
# print("time taken to publish",end,self.order," numbers was",time_publish)
|
||
|
|
# print("############################################################")
|
||
|
|
|
||
|
|
|
||
|
|
if (self.order=='newest'): # if the published files are gonna be the newest (publish actual data from sensor)
|
||
|
|
|
||
|
|
end = 1 #only allows 1 line to be selected, since there's no sleep
|
||
|
|
for row in sqlite_db.select_newest(end):
|
||
|
|
if(self.publish_and_wait(store_count,json_concat(row[0],self.topic))):
|
||
|
|
store_count = store_count-1
|
||
|
|
i=i+1
|
||
|
|
sqlite_db.remove_newest_from_db(i)#delete line
|
||
|
|
sqlite_db.conn.commit()
|
||
|
|
sqlite_db.conn.close()
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
return
|
||
|
|
#thread that runs when publishing from archive
|
||
|
|
def publish_from_archive(self):
|
||
|
|
ThNames= [];
|
||
|
|
for thread in threading.enumerate():
|
||
|
|
ThNames.append(thread.name);
|
||
|
|
if (self.threadName in ThNames):
|
||
|
|
print("Thread",self.threadName,"is busy")#check if not empty
|
||
|
|
else:
|
||
|
|
self.t1 = threading.Thread(name =self.threadName,target=self.check_publish_stored_files, args=())
|
||
|
|
self.t1.setDaemon(True);
|
||
|
|
self.t1.start()
|
||
|
|
|
||
|
|
def write_line_db_sub(self,msg_pack):
|
||
|
|
global store_count
|
||
|
|
data = loads(msg_pack);
|
||
|
|
sqlite_db=db_handle.Sqlite3_DB();
|
||
|
|
sqlite_db.cursor.execute("INSERT into %s values (?, ?);"%(sqlite_db.tablename),[data["time"] , msg_pack])
|
||
|
|
sqlite_db.conn.commit();
|
||
|
|
store_count=store_count+1
|
||
|
|
if (store_count>1):
|
||
|
|
print("Message ",store_count," stored");
|
||
|
|
sqlite_db.conn.close()
|
||
|
|
return;
|