1 | #!/usr/bin/env python3 |
---|
2 | |
---|
3 | import time |
---|
4 | #import threading |
---|
5 | #from multiprocessing.dummy import Pool as ThreadPool |
---|
6 | from multiprocessing.dummy import Process,Lock |
---|
7 | #import multiprocessing |
---|
8 | import sys |
---|
9 | import MySQLdb as mdb |
---|
10 | import signal |
---|
11 | import os |
---|
12 | import queue |
---|
13 | import hashlib |
---|
14 | import gc |
---|
15 | import os |
---|
16 | import psutil |
---|
17 | from functools import wraps |
---|
18 | import re |
---|
19 | import daemon |
---|
20 | import lockfile |
---|
21 | import logging |
---|
22 | import logging.handlers |
---|
23 | from logging import config |
---|
24 | from functools import reduce |
---|
25 | import traceback |
---|
26 | |
---|
27 | ##### START EDITABLE VARS ##### |
---|
28 | |
---|
29 | DEBUG=False |
---|
30 | MIN_LOG_LEVEL=logging.INFO |
---|
31 | DAEMON=True |
---|
32 | FILE='/usr/lib/analytics-server/analytics/config.php' |
---|
33 | LIMIT=70.0 # PERCENTAGE LOAD LIMIT |
---|
34 | WARNING_MIN_HEAP = 256 |
---|
35 | ##### END EDITABLE VARS ##### |
---|
36 | |
---|
37 | DEBUG_PRINT_ALIVE_COUNTER=60 |
---|
38 | THREADED=True |
---|
39 | MAX_SLOW_CHECK_CLIENTS = 30 |
---|
40 | USE_FILE_EVENT_LOG=True |
---|
41 | USE_FILE_EVENT_DB=False |
---|
42 | USE_CONFIG_FILE_LOG=True |
---|
43 | CONFIG_FILE_LOG='/var/run/analyticsd_config.log' |
---|
44 | FILE_EVENT_LOG='/var/run/analyticsd_event.log' |
---|
45 | |
---|
46 | if DEBUG: |
---|
47 | if MIN_LOG_LEVEL: |
---|
48 | loglevel=MIN_LOG_LEVEL |
---|
49 | else: |
---|
50 | loglevel=logging.DEBUG |
---|
51 | else: |
---|
52 | loglevel=logging.INFO |
---|
53 | |
---|
54 | LOGGING = { |
---|
55 | 'version': 1, |
---|
56 | 'disable_existing_loggers': False, |
---|
57 | 'formatters': { |
---|
58 | 'verbose': { |
---|
59 | 'format': '%(levelname)s %(module)s %(message)s' |
---|
60 | #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s' |
---|
61 | }, |
---|
62 | }, |
---|
63 | 'handlers': { |
---|
64 | 'stdout': { |
---|
65 | 'class': 'logging.StreamHandler', |
---|
66 | 'stream': sys.stdout, |
---|
67 | 'formatter': 'verbose', |
---|
68 | }, |
---|
69 | 'sys-logger6': { |
---|
70 | 'class': 'logging.handlers.SysLogHandler', |
---|
71 | 'address': '/dev/log', |
---|
72 | 'facility': "local6", |
---|
73 | 'formatter': 'verbose', |
---|
74 | }, |
---|
75 | }, |
---|
76 | 'loggers': { |
---|
77 | 'analyticsd-logger': { |
---|
78 | 'handlers': ['sys-logger6','stdout'], |
---|
79 | 'level': loglevel, |
---|
80 | 'propagate': True, |
---|
81 | }, |
---|
82 | } |
---|
83 | } |
---|
84 | |
---|
85 | def to_str(x): |
---|
86 | if isinstance(x,str): |
---|
87 | return to_utf(x).decode('unicode_escape') |
---|
88 | else: |
---|
89 | return x |
---|
90 | |
---|
91 | def to_utf(x): |
---|
92 | if isinstance(x,str): |
---|
93 | return x.encode('utf-8') |
---|
94 | else: |
---|
95 | return x |
---|
96 | |
---|
97 | def printea(msg="",level='critical'): |
---|
98 | if level == 'critical': |
---|
99 | logger.critical(msg) |
---|
100 | elif level == 'error': |
---|
101 | logger.error(msg) |
---|
102 | elif level == 'warning': |
---|
103 | logger.warning(msg) |
---|
104 | elif level == 'info': |
---|
105 | logger.info(msg) |
---|
106 | else: |
---|
107 | logger.debug(msg) |
---|
108 | |
---|
109 | def keepalive(who=""): |
---|
110 | global DEBUG_PRINT_ALIVE_COUNTER |
---|
111 | t=str(int(time.time())) |
---|
112 | if DEBUG: |
---|
113 | pass |
---|
114 | # too much verbose |
---|
115 | # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug') |
---|
116 | else: |
---|
117 | if who == 'main': |
---|
118 | if DEBUG_PRINT_ALIVE_COUNTER > 0: |
---|
119 | DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1 |
---|
120 | else: |
---|
121 | DEBUG_PRINT_ALIVE_COUNTER=60 |
---|
122 | printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info') |
---|
123 | |
---|
124 | fp = open('/var/run/analyticsd.keepalive','w'); |
---|
125 | fp.write(t) |
---|
126 | fp.close() |
---|
127 | |
---|
128 | config.dictConfig(LOGGING) |
---|
129 | logger = logging.getLogger('analyticsd-logger') |
---|
130 | |
---|
131 | DBNAME=None |
---|
132 | USER=None |
---|
133 | PASS=None |
---|
134 | IP=None |
---|
135 | |
---|
136 | EMPTY_PAUSED_SLEEP=1 |
---|
137 | CHECK_LOAD_TIME=5 |
---|
138 | MAX_RETRIES=5 |
---|
139 | TIMED_ON=False |
---|
140 | |
---|
141 | try: |
---|
142 | with open(FILE,'r') as f: |
---|
143 | for line in f: |
---|
144 | if not IP: |
---|
145 | IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
146 | if IP: |
---|
147 | IP = IP.group(1) |
---|
148 | if not DBNAME: |
---|
149 | DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
150 | if DBNAME: |
---|
151 | DBNAME = DBNAME.group(1) |
---|
152 | if not USER: |
---|
153 | USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
154 | if USER: |
---|
155 | USER = USER.group(1) |
---|
156 | if not PASS: |
---|
157 | PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line) |
---|
158 | if PASS: |
---|
159 | PASS = PASS.group(1) |
---|
160 | if not (IP or DBNAME or USER or PASS): |
---|
161 | printea("Couldn't get database configuration from {}".format(FILE)) |
---|
162 | else: |
---|
163 | printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug') |
---|
164 | except Exception as e: |
---|
165 | printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e))) |
---|
166 | sys.exit(1) |
---|
167 | |
---|
168 | if not (IP or DBNAME or USER or PASS): |
---|
169 | printea("Couldn't get database configuration from {}".format(FILE)) |
---|
170 | sys.exit(1) |
---|
171 | |
---|
172 | if THREADED: |
---|
173 | THREADS=['main','server','client','desktop','other'] |
---|
174 | else: |
---|
175 | THREADS=['main'] |
---|
176 | |
---|
177 | class DB(): |
---|
178 | def __init__(self,mon,t='main'): |
---|
179 | self._initialized=False |
---|
180 | self.t=t |
---|
181 | self.reconnect=0 |
---|
182 | self.empty = False |
---|
183 | self.conn=None |
---|
184 | self.mon=mon |
---|
185 | self.q=queue.Queue() |
---|
186 | self.processed=0 |
---|
187 | self.need_clean = False |
---|
188 | printea('Database worker {} initialized'.format(t),'info') |
---|
189 | |
---|
190 | def timed(func): |
---|
191 | @wraps(func) |
---|
192 | def wrapper(*args,**kwargs): |
---|
193 | if TIMED_ON: |
---|
194 | printea("Start({}): @{}".format(func.__name__,time.time()),'debug') |
---|
195 | ret=func(*args,**kwargs) |
---|
196 | if TIMED_ON: |
---|
197 | printea("End ({}): @{}".format(func.__name__,time.time()),'debug') |
---|
198 | return ret |
---|
199 | return wrapper |
---|
200 | |
---|
201 | def with_retry(func): |
---|
202 | @wraps(func) |
---|
203 | def wrapper(*args,**kwargs): |
---|
204 | if 'retry' not in kwargs: |
---|
205 | kwargs['retry']=1 |
---|
206 | if 'mon' not in kwargs: |
---|
207 | kwargs['mon'] = None |
---|
208 | try: |
---|
209 | return func(*args,**kwargs) |
---|
210 | except Exception as e: |
---|
211 | printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e))) |
---|
212 | if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate): |
---|
213 | printea("Fatal error in ({}), max retries exceded".format(func.__name__)) |
---|
214 | if kwargs['mon']: |
---|
215 | kwargs['mon'].term() |
---|
216 | sys.exit(1) |
---|
217 | return None |
---|
218 | else: |
---|
219 | time.sleep(kwargs['retry']**2) |
---|
220 | kwargs['retry']+=1 |
---|
221 | return wrapper(*args,**kwargs) |
---|
222 | return result |
---|
223 | return wrapper |
---|
224 | |
---|
225 | def with_debug(func): |
---|
226 | @wraps(func) |
---|
227 | def wrapper(*args,**kwargs): |
---|
228 | if 'query' in kwargs: |
---|
229 | printea("executing query: {}".format(kwargs['query']),'debug') |
---|
230 | return func(*args,**kwargs) |
---|
231 | return wrapper |
---|
232 | |
---|
233 | @with_debug |
---|
234 | def execute(self,*args,**kwargs): |
---|
235 | if 'query' not in kwargs: |
---|
236 | printea("Warning execute called whithout query",'info') |
---|
237 | return None |
---|
238 | try: |
---|
239 | return self.cur.execute(kwargs['query']) |
---|
240 | except mdb.OperationalError as e: |
---|
241 | printea('({}) Operational error on mysql, error is: {}'.format(self.t,e),'error') |
---|
242 | self.init_db() |
---|
243 | except Exception as e: |
---|
244 | raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query'])) |
---|
245 | |
---|
246 | def init_db(self): |
---|
247 | try: |
---|
248 | self.conn = mdb.connect(IP,USER,PASS,DBNAME) |
---|
249 | self.conn.autocommit(False) |
---|
250 | self.cur = self.conn.cursor() |
---|
251 | self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") |
---|
252 | variables=self.check_server_variables() |
---|
253 | |
---|
254 | if variables: |
---|
255 | self.mon.server_variables=variables |
---|
256 | |
---|
257 | printea("Connected succesfully {} thread_id={}".format(self.t,self.conn.thread_id()),'info') |
---|
258 | |
---|
259 | except mdb.Error as e: |
---|
260 | printea("Error {}: {}".format(e.args[0],e.args[1])) |
---|
261 | raise Exception(e) |
---|
262 | |
---|
263 | def check_server_variables(self): |
---|
264 | if self.t != 'main': |
---|
265 | return None |
---|
266 | else: |
---|
267 | printea('Getting server variables','info') |
---|
268 | result = {} |
---|
269 | self.cur.execute("show global variables") |
---|
270 | printea('Setting {} vars'.format(self.cur.rowcount),'debug') |
---|
271 | if self.cur.rowcount > 0: |
---|
272 | for i in range(self.cur.rowcount): |
---|
273 | var_name, var_value = self.cur.fetchone() |
---|
274 | result.setdefault(var_name,var_value) |
---|
275 | return result |
---|
276 | |
---|
277 | def get_config(self): |
---|
278 | values={} |
---|
279 | self.cur.execute('select name,value from Config') |
---|
280 | for val in self.cur.fetchall(): |
---|
281 | values.setdefault(val[0],val[1]) |
---|
282 | return values |
---|
283 | |
---|
284 | @with_retry |
---|
285 | def put_config(self,*args,**kwargs): |
---|
286 | values = kwargs.get('values') |
---|
287 | vals=[] |
---|
288 | for x in values.keys(): |
---|
289 | vals.append("('{}','{}')".format(x,values[x])) |
---|
290 | try: |
---|
291 | self.execute(query="insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals))) |
---|
292 | self.conn.commit() |
---|
293 | except Exception as e: |
---|
294 | printea(e,'error') |
---|
295 | |
---|
296 | @with_retry |
---|
297 | def check_temporary_tables_size(self,*args,**kwargs): |
---|
298 | table_name=kwargs.get('table_name') |
---|
299 | if self.t != 'load': |
---|
300 | return None |
---|
301 | else: |
---|
302 | size = 0 |
---|
303 | rows = 0 |
---|
304 | self.execute(query='select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name)) |
---|
305 | |
---|
306 | res=self.cur.fetchone() |
---|
307 | size = float(res[0]) |
---|
308 | rows = int(res[1]) |
---|
309 | return (int(size),rows) |
---|
310 | |
---|
311 | def close_db(self): |
---|
312 | if self.conn: |
---|
313 | self.conn.close() |
---|
314 | printea("Closed connection {}".format(self.t),'info') |
---|
315 | |
---|
316 | def reduce_flavour(self,version,flavour): |
---|
317 | if version == '15': |
---|
318 | if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour: |
---|
319 | return 'desktop' |
---|
320 | elif 'server' in flavour: |
---|
321 | return 'server' |
---|
322 | elif 'client' in flavour: |
---|
323 | return 'client' |
---|
324 | elif version == '16': |
---|
325 | if 'server' in flavour: |
---|
326 | return 'server' |
---|
327 | elif 'client' in flavour: |
---|
328 | return 'client' |
---|
329 | elif 'desktop' in flavour: |
---|
330 | return 'desktop' |
---|
331 | return 'other' |
---|
332 | |
---|
333 | def reduce_version(self,version): |
---|
334 | if version[0:2] in ['15','16']: |
---|
335 | return version[0:2] |
---|
336 | else: |
---|
337 | return 'other' |
---|
338 | |
---|
339 | def gen_uuid(self,*args,**kwargs): |
---|
340 | try: |
---|
341 | string=to_utf(u'-'.join([str(x) for x in args])) |
---|
342 | h=hashlib.sha1(string) |
---|
343 | return int(h.hexdigest()[0:16],16) |
---|
344 | except Exception as e: |
---|
345 | printea(traceback.format_exc(),'critical') |
---|
346 | |
---|
347 | @timed |
---|
348 | @with_retry |
---|
349 | def get_client(self,*args,**kwargs): |
---|
350 | try: |
---|
351 | query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu,ltsp,mode from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window)) |
---|
352 | self.execute(query=query) |
---|
353 | ret =[] |
---|
354 | if self.cur.rowcount > 0: |
---|
355 | for i in range(self.cur.rowcount): |
---|
356 | v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu,v_ltsp,v_mode=self.cur.fetchone() |
---|
357 | version=self.reduce_version(v_version) |
---|
358 | flavour=self.reduce_flavour(version,v_flavour) |
---|
359 | uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour) |
---|
360 | if not v_arch: |
---|
361 | v_arch = 'NULL' |
---|
362 | if not v_mem: |
---|
363 | v_mem = 'NULL' |
---|
364 | if not v_vga: |
---|
365 | v_vga = 'NULL' |
---|
366 | if not v_cpu: |
---|
367 | v_cpu = 'NULL' |
---|
368 | if not v_ncpu: |
---|
369 | v_ncpu = 'NULL' |
---|
370 | if v_ltsp == '1' or v_ltsp == True: |
---|
371 | v_ltsp = 'TRUE' |
---|
372 | elif v_ltsp == '0' or v_ltsp == False: |
---|
373 | v_ltsp = 'FALSE' |
---|
374 | else: |
---|
375 | v_ltsp = 'NULL' |
---|
376 | if not v_mode: |
---|
377 | v_mode='NULL' |
---|
378 | |
---|
379 | ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu,'ltsp':v_ltsp,'mode':v_mode}) |
---|
380 | return ret |
---|
381 | else: |
---|
382 | return True |
---|
383 | except Exception as e: |
---|
384 | raise Exception("Error getting client: {}".format(e)) |
---|
385 | |
---|
386 | @timed |
---|
387 | @with_retry |
---|
388 | def get_apps(self,*args,**kwargs): |
---|
389 | if 'clients' not in kwargs: |
---|
390 | printea("Warning executed without named parameter clients",'info') |
---|
391 | return None |
---|
392 | ret = {} |
---|
393 | try: |
---|
394 | cids_to_rel_fla={} |
---|
395 | for c in kwargs['clients']: |
---|
396 | if c['id'] not in cids_to_rel_fla: |
---|
397 | cids_to_rel_fla[c['id']]=(c['version'],c['flavour']) |
---|
398 | if c['flavour'] not in ret: |
---|
399 | ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None} |
---|
400 | for c in kwargs['clients']: |
---|
401 | ret[c['flavour']]['clients'].append(c) |
---|
402 | |
---|
403 | query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys()))) |
---|
404 | self.execute(query=query) |
---|
405 | if self.cur.rowcount > 0: |
---|
406 | for i in range(self.cur.rowcount): |
---|
407 | row = self.cur.fetchone() |
---|
408 | clid = row[0] |
---|
409 | rel,fla = cids_to_rel_fla[clid] |
---|
410 | uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2]) |
---|
411 | ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]}) |
---|
412 | return ret |
---|
413 | except Exception as e: |
---|
414 | raise Exception("Error getting apps: {}".format(e)) |
---|
415 | |
---|
416 | @timed |
---|
417 | @with_retry |
---|
418 | def put_client(self,*args,**kwargs): |
---|
419 | if 'client_list' not in kwargs: |
---|
420 | raise Exception("Called without named parameter client_list") |
---|
421 | values= [] |
---|
422 | for cli in kwargs['client_list']: |
---|
423 | values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{},{},'{}')".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu'],cli['ltsp'],cli['mode'])) |
---|
424 | query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`,`ltsp`,`mode`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values))) |
---|
425 | self.execute(query=query) |
---|
426 | return True |
---|
427 | |
---|
428 | @timed |
---|
429 | @with_retry |
---|
430 | def put_apps(self,*args,**kwargs): |
---|
431 | if 'apps' not in kwargs: |
---|
432 | raise Exception("Called without named parameter apps") |
---|
433 | app_list = {} |
---|
434 | for app in kwargs['apps']: |
---|
435 | #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app']) |
---|
436 | if str(app['uuid']) not in app_list: |
---|
437 | app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']} |
---|
438 | else: |
---|
439 | app_list[str(app['uuid'])]['value']+=app['value'] |
---|
440 | values = [] |
---|
441 | for app in app_list: |
---|
442 | item=app_list[app] |
---|
443 | values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value'])) |
---|
444 | query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values))) |
---|
445 | self.execute(query=query) |
---|
446 | return True |
---|
447 | |
---|
448 | @timed |
---|
449 | @with_retry |
---|
450 | def del_client(self,*args,**kwargs): |
---|
451 | if 'client_list' not in kwargs: |
---|
452 | raise Exception("Called without named parameter client_list") |
---|
453 | query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list']))) |
---|
454 | self.execute(query=query) |
---|
455 | return True |
---|
456 | |
---|
457 | @timed |
---|
458 | @with_retry |
---|
459 | def del_apps(self,*args,**kwargs): |
---|
460 | if 'client_list' not in kwargs: |
---|
461 | raise Exception("Called without named parameter client_list") |
---|
462 | query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list']))) |
---|
463 | self.execute(query=query) |
---|
464 | return True |
---|
465 | |
---|
466 | @timed |
---|
467 | def reset_autoinc(self): |
---|
468 | try: |
---|
469 | query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME) |
---|
470 | self.execute(query=query) |
---|
471 | ainc = self.cur.fetchone()[0] |
---|
472 | #query = "SELECT count(*) from tmp_clients" |
---|
473 | #self.execute(query=query) |
---|
474 | #cli_size=self.cur.fetchone()[0] |
---|
475 | #query = "SELECT count(*) from tmp_packages" |
---|
476 | #self.execute(query=query) |
---|
477 | #pkg_size=self.cur.fetchone()[0] |
---|
478 | #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug') |
---|
479 | if ainc > 65500 and self.mon.all_queues_empty(): |
---|
480 | #and cli_size == 0 and pkg_size == 0: |
---|
481 | query = "TRUNCATE TABLE tmp_clients" |
---|
482 | self.execute(query=query) |
---|
483 | query = "TRUNCATE TABLE tmp_packages" |
---|
484 | self.execute(query=query) |
---|
485 | return True |
---|
486 | except Exception as e: |
---|
487 | raise Exception("Error reseting auto_increment: {}".format(e)) |
---|
488 | |
---|
489 | |
---|
490 | @timed |
---|
491 | def process_main_thread(self,*args,**kwargs): |
---|
492 | if self.mon.slow_check_clients > 0 : |
---|
493 | self.mon.slow_check_clients -= 1 |
---|
494 | time.sleep(1) |
---|
495 | return True |
---|
496 | else: |
---|
497 | self.mon.slow_check_clients = self.mon.default_slow_check_clients |
---|
498 | |
---|
499 | clis=self.get_client(mon=self.mon) |
---|
500 | if clis == True: #No clients found (empty) |
---|
501 | self.empty = True # if > 65500 auto_increment was reset |
---|
502 | self.mon.schedule(event='NO_MORE_CLIENTS') |
---|
503 | return False |
---|
504 | #clients found |
---|
505 | self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis)) |
---|
506 | # if clients found get apps |
---|
507 | lapps = self.get_apps(clients=clis,mon=self.mon) |
---|
508 | #lapps can be empty |
---|
509 | self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term) |
---|
510 | #If deletion was failed , thread was died |
---|
511 | lapps_tmp={'apps':[],'clients':[]} |
---|
512 | for fla in lapps: |
---|
513 | if THREADED: |
---|
514 | lapps[fla]['timestamp']=time.time() |
---|
515 | self.mon.db[fla].q.put(lapps[fla],True) |
---|
516 | else: |
---|
517 | lapps_tmp['apps'].extend(lapps[fla]['apps']) |
---|
518 | lapps_tmp['clients'].extend(lapps[fla]['clients']) |
---|
519 | self.mon.db['main'].q.put(lapps_tmp,True) |
---|
520 | #if DEBUG: |
---|
521 | self.processed+=len(clis) |
---|
522 | return True |
---|
523 | |
---|
524 | @timed |
---|
525 | def process_all_threads(self,*args,**kwargs): |
---|
526 | lapps=self.q.get(True) |
---|
527 | #print "Running {}".format(self.t) |
---|
528 | if THREADED: |
---|
529 | while (lapps['timestamp'] > self.mon.commited): |
---|
530 | time.sleep(0.001) |
---|
531 | if len(lapps['clients']) != 0: |
---|
532 | printea('Thread {} putting client'.format(self.t),'debug') |
---|
533 | #IF FAIL, AFTER RETRIES THREAD DIES |
---|
534 | if not self.put_client(client_list=lapps['clients'],mon=self.mon): |
---|
535 | self.q.put(lapps,True) # USELESS |
---|
536 | return False # USELESS |
---|
537 | if len(lapps['apps']) != 0: |
---|
538 | printea('Thread {} putting clientapps'.format(self.t),'debug') |
---|
539 | if not (self.put_apps(apps=lapps['apps'],mon=self.mon)): |
---|
540 | self.q.put(lapps,True) # USELESS |
---|
541 | return False # USELESS |
---|
542 | if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon): |
---|
543 | self.q.put(lapps,True) # USELESS |
---|
544 | return False # USELESS |
---|
545 | #if DEBUG: |
---|
546 | self.processed+=len(lapps['clients']) |
---|
547 | return True |
---|
548 | |
---|
549 | def process(self,*args,**kwargs): |
---|
550 | keepalive(self.t) |
---|
551 | # warning too much verbose, printea("Running thread {}".format(self.t),'debug') |
---|
552 | if self.t == 'main' and not self.mon.terminate: |
---|
553 | ret=self.process_main_thread(*args,**kwargs) |
---|
554 | if ret == False: #No more clients available |
---|
555 | return True |
---|
556 | if ret == True: #Clients was put on queues, need process more without waiting, main queue always empty |
---|
557 | if THREADED: |
---|
558 | return True #Need return to avoid put empty flag and wait into main thread |
---|
559 | |
---|
560 | #after this poing, code for all threads |
---|
561 | if not self.q.empty(): |
---|
562 | ret=self.process_all_threads(*args,**kwargs) |
---|
563 | if ret == False: # USELESS? THREADS was died |
---|
564 | printea("Error threads") |
---|
565 | return ret |
---|
566 | else: |
---|
567 | del self.q |
---|
568 | self.q = queue.Queue() |
---|
569 | self.empty = True |
---|
570 | return True |
---|
571 | |
---|
572 | def worker(self): |
---|
573 | printea("Starting worker {} processing".format(self.t),'info') |
---|
574 | while not (self.mon.terminate and self.empty): |
---|
575 | if self.mon.paused or self.empty: |
---|
576 | #if self.empty and self.t == 'main': |
---|
577 | # self.reset_autoinc() |
---|
578 | if self.mon.paused: |
---|
579 | printea("Paused by high load {}".format(self.t),'debug') |
---|
580 | # Too much verbose |
---|
581 | #if self.empty: |
---|
582 | # printea("Empty queue {} sleeping by now".format(self.t),'debug') |
---|
583 | if self.empty: |
---|
584 | self.empty = False |
---|
585 | time.sleep(EMPTY_PAUSED_SLEEP) |
---|
586 | else: |
---|
587 | try: |
---|
588 | if self.conn == None: |
---|
589 | if not self._initialized: |
---|
590 | self._initialized = True |
---|
591 | raise EnvironmentError('Initializing connector {}'.format(self.t)) |
---|
592 | else: |
---|
593 | raise Warning('Connection not available') |
---|
594 | else: |
---|
595 | self.conn.begin() |
---|
596 | if self.process(): |
---|
597 | self.conn.commit() |
---|
598 | self.mon.commited=time.time() |
---|
599 | self.reconnect = 0 |
---|
600 | if self.need_clean: |
---|
601 | gc.collect() |
---|
602 | else: |
---|
603 | self.conn.rollback() |
---|
604 | except EnvironmentError as e: |
---|
605 | printea(e,'info') |
---|
606 | self.init_db() |
---|
607 | except Warning as e: |
---|
608 | printea(e,'warning') |
---|
609 | self.init_db() |
---|
610 | except Exception as e: |
---|
611 | try: |
---|
612 | if self.conn != None: |
---|
613 | self.conn.rollback() |
---|
614 | except: |
---|
615 | printea("Can't rollback last actions",'info') |
---|
616 | pass |
---|
617 | #if e[0] != 2006: |
---|
618 | # printea("Exception processing worker({}): {}".format(self.t,e)) |
---|
619 | #if e[0] == 2006: # SERVER GONE AWAY |
---|
620 | # printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e)) |
---|
621 | |
---|
622 | printea("Trying to recover connection ({}), {}".format(self.t,e)) |
---|
623 | if self.reconnect == 100: |
---|
624 | printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t)) |
---|
625 | self.mon.term() |
---|
626 | else: |
---|
627 | self.reconnect+=1 |
---|
628 | printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect)) |
---|
629 | time.sleep(self.reconnect*self.reconnect) |
---|
630 | |
---|
631 | try: |
---|
632 | self.init_db() |
---|
633 | printea("Recovered worker {} connection".format(self.t),'info') |
---|
634 | except: |
---|
635 | printea('Unable to initialize worker {}'.format(self.t)) |
---|
636 | pass |
---|
637 | |
---|
638 | |
---|
639 | class Monitor(): |
---|
640 | def __init__(self): |
---|
641 | self.MAX_QUEUE_UTILIZATION = 100 |
---|
642 | self.USE_MAX_QUEUES = True |
---|
643 | self.MAX_SELECT_WINDOW = (2 ** 13) +1 |
---|
644 | self.MIN_SELECT_WINDOW = 32 |
---|
645 | self.MEM_USED=0 |
---|
646 | self.MAX_MEM=512 |
---|
647 | self.MIN_FREE_MEM_SERVER=100 |
---|
648 | self.USE_MAX_MEM=True |
---|
649 | self.lock = Lock() |
---|
650 | self.terminate = False |
---|
651 | self.finished = False |
---|
652 | self.paused = False |
---|
653 | self.select_window = self.MIN_SELECT_WINDOW |
---|
654 | self.commited = time.time() |
---|
655 | self.procesed = 0 |
---|
656 | self.procesed_per_sec = [0]*10 |
---|
657 | self.load = 0 |
---|
658 | self.server_variables = None # initialized by main worker |
---|
659 | self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } |
---|
660 | self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } |
---|
661 | self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } |
---|
662 | self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } |
---|
663 | self.default_slow_check_clients = 1 |
---|
664 | self.slow_check_clients = self.default_slow_check_clients |
---|
665 | self.server_mem = 0 |
---|
666 | self.loadlist = [ 0.0 ] * 100 |
---|
667 | self.max_heap = None |
---|
668 | self.last_events = [] |
---|
669 | |
---|
670 | signal.signal(signal.SIGQUIT,self.term) |
---|
671 | signal.signal(signal.SIGTERM,self.term) |
---|
672 | signal.signal(signal.SIGINT,self.term) |
---|
673 | |
---|
674 | self.db = {} |
---|
675 | self.threads = {} |
---|
676 | for x in THREADS: |
---|
677 | self.db[x] = DB(self,x) |
---|
678 | #try: |
---|
679 | # self.db[x].init_db() |
---|
680 | #except Exception as e: |
---|
681 | # printea('Error initializing database connections: {}'.format(str(e))) |
---|
682 | # sys.exit(1) |
---|
683 | self.cfg= None |
---|
684 | printea("Monitor initialized with {} threads".format(len(THREADS)),'info') |
---|
685 | |
---|
686 | def windowctl(self, *args, **kwargs): |
---|
687 | if args[0] == '+': |
---|
688 | if self.select_window*2 < self.MAX_SELECT_WINDOW: |
---|
689 | self.select_window*=2 |
---|
690 | self.select_window=int(self.select_window) |
---|
691 | if args[0] == '-': |
---|
692 | if self.select_window > self.MIN_SELECT_WINDOW: |
---|
693 | self.select_window/=2 |
---|
694 | self.select_window=int(self.select_window) |
---|
695 | |
---|
696 | def slowcheckctl(self, *args, **kwargs): |
---|
697 | if args[0] == 'reset': |
---|
698 | self.default_slow_check_clients = 0 |
---|
699 | if args[0] == '+': |
---|
700 | if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS: |
---|
701 | if self.default_slow_check_clients == 0: |
---|
702 | self.default_slow_check_clients = 1 |
---|
703 | else: |
---|
704 | self.default_slow_check_clients = self.default_slow_check_clients * 2 |
---|
705 | if args[0] == '-': |
---|
706 | if self.default_slow_check_clients > 0: |
---|
707 | self.default_slow_check_clients = self.default_slow_check_clients / 2 |
---|
708 | |
---|
709 | def append_event_log(self,*args,**kwargs): |
---|
710 | if DEBUG: |
---|
711 | if USE_FILE_EVENT_LOG: |
---|
712 | try: |
---|
713 | fp = open(FILE_EVENT_LOG,'a') |
---|
714 | except: |
---|
715 | fp = False |
---|
716 | else: |
---|
717 | fp = False |
---|
718 | separator=':' |
---|
719 | for x in args: |
---|
720 | the_time=str(int(time.time())) |
---|
721 | the_value=str(x) |
---|
722 | the_string=the_time+separator+the_value |
---|
723 | self.last_events.append(the_string) |
---|
724 | if fp: |
---|
725 | fp.write('{}\n'.format(the_string)) |
---|
726 | if fp: |
---|
727 | fp.close() |
---|
728 | |
---|
729 | def schedule(self, *args, **kwargs): |
---|
730 | if kwargs['event'] == 'NO_MORE_CLIENTS': |
---|
731 | self.append_event_log(kwargs['event']) |
---|
732 | self.windowctl('-') |
---|
733 | self.slowcheckctl('+') |
---|
734 | |
---|
735 | if kwargs['event'] == 'HAVE_CLIENTS': |
---|
736 | self.append_event_log(kwargs['event']) |
---|
737 | if kwargs['nclients'] == self.select_window: |
---|
738 | self.windowctl('+') |
---|
739 | else: |
---|
740 | self.windowctl('-') |
---|
741 | self.slowcheckctl('reset') |
---|
742 | # |
---|
743 | # if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION: |
---|
744 | # self.windowctl('+') |
---|
745 | # |
---|
746 | # if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM: |
---|
747 | # self.windowctl('-') |
---|
748 | # self.slowcheckctl('+') |
---|
749 | # else: |
---|
750 | # self.windowctl('+') |
---|
751 | # |
---|
752 | # elif kwargs['nclients'] < self.select_window/2: |
---|
753 | # self.windowctl('-') |
---|
754 | # |
---|
755 | # self.slowcheckctl('reset') |
---|
756 | |
---|
757 | if kwargs['event']=='CHECK_SANITY': |
---|
758 | # CPU SETTINGS |
---|
759 | if not self.terminate and self.load > LIMIT: |
---|
760 | self.append_event_log('LOAD_LIMIT_REACHED') |
---|
761 | self.paused = True |
---|
762 | else: |
---|
763 | self.paused = False |
---|
764 | |
---|
765 | # DB MEM SETTINGS |
---|
766 | if self.max_heap and self.temporary_tables_size['sum']: |
---|
767 | if self.temporary_tables_size['sum'] > self.max_heap * 0.3: |
---|
768 | self.append_event_log('MAX_HEAP_ALERT') |
---|
769 | self.windowctl('+') |
---|
770 | self.slowcheckctl('-') |
---|
771 | if self.paused: |
---|
772 | #printea('Hitting max temporary table size unpausing','critical') |
---|
773 | self.paused = False |
---|
774 | # SERVER MEM |
---|
775 | if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER: |
---|
776 | self.append_event_log('SERVER_MEM_ALERT') |
---|
777 | #printea('Hitting max memory from server collecting and reducing window','critical') |
---|
778 | self.windowctl('-') |
---|
779 | self.slowcheckctl('+') |
---|
780 | self.USE_MAX_QUEUES=True |
---|
781 | for x in THREADS: |
---|
782 | self.db[x].need_clean=True |
---|
783 | gc.collect() |
---|
784 | else: |
---|
785 | # self.USE_MAX_QUEUES=False |
---|
786 | for x in THREADS: |
---|
787 | self.db[x].need_clean=False |
---|
788 | |
---|
789 | if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM: |
---|
790 | self.append_event_log('MAX_MEM_ALERT') |
---|
791 | self.windowctl('-') |
---|
792 | self.slowcheckctl('+') |
---|
793 | |
---|
794 | # QUEUES |
---|
795 | if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION: |
---|
796 | self.append_event_log('MAX_QUEUES_REACHED') |
---|
797 | self.windowctl('+') |
---|
798 | |
---|
799 | |
---|
800 | def get_cpu_load(self): |
---|
801 | self.loadlist.append(psutil.cpu_percent()) |
---|
802 | self.loadlist=self.loadlist[1:] |
---|
803 | avg=0.0 |
---|
804 | for x in self.loadlist: |
---|
805 | avg+=x |
---|
806 | return round(avg/100.0,2) |
---|
807 | |
---|
808 | def term(self,*args,**kwargs): |
---|
809 | printea("Begin kill the program, wait please...",'info') |
---|
810 | self.terminate=True |
---|
811 | |
---|
812 | def prepare_threads(self): |
---|
813 | global DAEMON |
---|
814 | |
---|
815 | self.threads['load']=Process(target=self.get_load) |
---|
816 | self.threads['load'].daemon = DAEMON |
---|
817 | self.threads['load'].start() |
---|
818 | for x in THREADS: |
---|
819 | self.threads[x]=Process(target=self.db[x].worker) |
---|
820 | self.threads[x].daemon = DAEMON |
---|
821 | self.threads[x].start() |
---|
822 | |
---|
823 | def get_mem_usage(self): |
---|
824 | process = psutil.Process(os.getpid()) |
---|
825 | mem = process.memory_info()[0] / float(2 ** 20) |
---|
826 | return mem |
---|
827 | |
---|
828 | def get_server_free_mem(self): |
---|
829 | mem = psutil.virtual_memory() |
---|
830 | return mem.free / (2 ** 20) |
---|
831 | |
---|
832 | def print_stats(self): |
---|
833 | global CHECK_LOAD_TIME |
---|
834 | # if DEBUG: |
---|
835 | if True: |
---|
836 | out="Processed: " |
---|
837 | out2='' |
---|
838 | total=0 |
---|
839 | for x in THREADS: |
---|
840 | out2+='{}={} '.format(x,self.db[x].processed) |
---|
841 | self.cfg.store('processed '+x,self.db[x].processed) |
---|
842 | if THREADED: |
---|
843 | if x != 'main': |
---|
844 | total += self.db[x].processed |
---|
845 | else: |
---|
846 | total += self.db[x].processed |
---|
847 | out += "TOTAL={} {}".format(total,out2) |
---|
848 | self.cfg.store('processed total',total) |
---|
849 | proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME) |
---|
850 | self.procesed_per_sec.append(proc_per_sec) |
---|
851 | self.procesed_per_sec=self.procesed_per_sec[-10:] |
---|
852 | f=lambda x,y:x+y |
---|
853 | suma_proc=reduce(f,self.procesed_per_sec) |
---|
854 | self.cfg.store('processing at',int(suma_proc/10)) |
---|
855 | self.procesed=total |
---|
856 | total = 0 |
---|
857 | if THREADED: |
---|
858 | out+="Queues: " |
---|
859 | sizes=self.get_q_sizes() |
---|
860 | out2='' |
---|
861 | for x in sizes: |
---|
862 | self.cfg.store('queue '+x,sizes[x]) |
---|
863 | out2+='{}={} '.format(x,sizes[x]) |
---|
864 | total += sizes[x] |
---|
865 | self.cfg.store('queue totals',total) |
---|
866 | out+="TOTAL={} {}".format(total,out2) |
---|
867 | self.cfg.store('select window',self.select_window) |
---|
868 | self.cfg.store('mem used',self.MEM_USED) |
---|
869 | self.cfg.store('load',self.load) |
---|
870 | if (self.paused): |
---|
871 | self.cfg.store('paused',1) |
---|
872 | else: |
---|
873 | self.cfg.store('paused',0) |
---|
874 | self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients'])) |
---|
875 | self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages'])) |
---|
876 | self.cfg.store('temp_clients_rows',int(self.temporary_tables_rows['clients'])) |
---|
877 | self.cfg.store('temp_packages_rows',int(self.temporary_tables_rows['packages'])) |
---|
878 | self.cfg.store('db_clients_size',int(self.db_tables_size['clients'])) |
---|
879 | self.cfg.store('db_packages_size',int(self.db_tables_size['packages'])) |
---|
880 | self.cfg.store('default_slow_check_clients',int(self.default_slow_check_clients)) |
---|
881 | self.cfg.store('slow_check_clients',int(self.slow_check_clients)) |
---|
882 | if DEBUG: |
---|
883 | if USE_FILE_EVENT_DB: |
---|
884 | max_event=len(self.last_events) |
---|
885 | if max_event > 20: |
---|
886 | max_event = 20 |
---|
887 | for i in range(0,max_event): |
---|
888 | self.cfg.store('event{:02d}'.format(i),self.last_events[i]) |
---|
889 | self.last_events=self.last_events[-20:] |
---|
890 | if THREADED: |
---|
891 | printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') |
---|
892 | #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) |
---|
893 | else: |
---|
894 | printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') |
---|
895 | #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) |
---|
896 | |
---|
897 | def get_q_sizes(self): |
---|
898 | sizes={} |
---|
899 | for x in THREADS: |
---|
900 | sizes[x]=self.db[x].q.qsize() |
---|
901 | return sizes |
---|
902 | |
---|
903 | def sum_q_sizes(self): |
---|
904 | sizes=self.get_q_sizes() |
---|
905 | f=lambda x,y: x+y |
---|
906 | return reduce(f,[sizes[x] for x in sizes]) |
---|
907 | |
---|
908 | def all_queues_empty(self): |
---|
909 | sizes=self.get_q_sizes() |
---|
910 | for x in sizes: |
---|
911 | if sizes[x] != 0: |
---|
912 | return False |
---|
913 | return True |
---|
914 | |
---|
915 | def get_load(self): |
---|
916 | # runs on separate thread |
---|
917 | db = DB(self,'load') |
---|
918 | db.init_db() |
---|
919 | self.cfg = Config(db) |
---|
920 | ctime=0 |
---|
921 | while not (self.terminate and self.finished): #and not self.paused |
---|
922 | self.cfg.store('keepalive',int(time.time())) |
---|
923 | time.sleep(1) |
---|
924 | self.load=self.get_cpu_load() |
---|
925 | ctime+=1 |
---|
926 | self.schedule(event='CHECK_SANITY') |
---|
927 | if ctime >CHECK_LOAD_TIME: |
---|
928 | self.cfg.write() |
---|
929 | ctime=0.0 |
---|
930 | |
---|
931 | self.MEM_USED=self.get_mem_usage() |
---|
932 | self.server_mem=self.get_server_free_mem() |
---|
933 | |
---|
934 | if self.server_variables and 'max_heap_table_size' in self.server_variables: |
---|
935 | self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20) |
---|
936 | if self.max_heap < WARNING_MIN_HEAP: |
---|
937 | printea("******************** MIN HEAP IS TOO SMALL -> {} < {} ********************".format(self.max_heap,WARNING_MIN_HEAP),'warning') |
---|
938 | |
---|
939 | self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='tmp_clients') |
---|
940 | self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='tmp_packages') |
---|
941 | self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='Client_Versions') |
---|
942 | self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='RecvPackages') |
---|
943 | self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages'] |
---|
944 | self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages'] |
---|
945 | self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages'] |
---|
946 | self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages'] |
---|
947 | qempty = self.all_queues_empty() |
---|
948 | if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty: |
---|
949 | db.reset_autoinc() |
---|
950 | self.print_stats() |
---|
951 | #self.load=os.getloadavg()[0] |
---|
952 | if qempty: |
---|
953 | gc.collect() |
---|
954 | #end if |
---|
955 | #end while |
---|
956 | db.close_db() |
---|
957 | |
---|
958 | def start(self): |
---|
959 | self.prepare_threads() |
---|
960 | |
---|
961 | def end(self): |
---|
962 | for x in self.db: |
---|
963 | self.threads[x].join() |
---|
964 | self.db[x].close_db() |
---|
965 | self.finished = True |
---|
966 | self.print_stats() |
---|
967 | |
---|
968 | |
---|
969 | class Config(): |
---|
970 | def __init__(self,connection): |
---|
971 | self._db = connection |
---|
972 | self.read() |
---|
973 | |
---|
974 | def store(self,var,value): |
---|
975 | var=var.replace(' ','_') |
---|
976 | if isinstance(value,str): |
---|
977 | if value.isnumeric(): |
---|
978 | setattr(self,var,int(value)) |
---|
979 | else: |
---|
980 | setattr(self,var,str(value)) |
---|
981 | else: |
---|
982 | setattr(self,var,int(value)) |
---|
983 | |
---|
984 | def write(self): |
---|
985 | values={} |
---|
986 | for x in self.get_internal_vars(): |
---|
987 | values.setdefault(str(x)[:20],str(getattr(self,x))[:45]) |
---|
988 | if self._db: |
---|
989 | self._db.put_config(values=values) |
---|
990 | the_string="CONFIGURATION\n" |
---|
991 | if USE_CONFIG_FILE_LOG: |
---|
992 | fnbuild = lambda kv : str(kv[0])+'='+str(kv[1])+"\n" |
---|
993 | the_string+=reduce((lambda x,y: x+y) ,map(fnbuild,values.items())) |
---|
994 | try: |
---|
995 | with open(CONFIG_FILE_LOG,'w') as fp: |
---|
996 | fp.write(the_string) |
---|
997 | except: |
---|
998 | pass |
---|
999 | |
---|
1000 | def read(self): |
---|
1001 | if self._db: |
---|
1002 | config=self._db.get_config() |
---|
1003 | for key in config.keys(): |
---|
1004 | if config[key].isnumeric(): |
---|
1005 | setattr(self,key,int(config[key])) |
---|
1006 | else: |
---|
1007 | setattr(self,key,config[key]) |
---|
1008 | else: |
---|
1009 | printea('No config yet') |
---|
1010 | |
---|
1011 | def get_internal_vars(self): |
---|
1012 | return list(filter(lambda x : x[0] != '_',self.__dict__.keys())) |
---|
1013 | |
---|
1014 | def print(self): |
---|
1015 | for v in self.get_internal_vars(): |
---|
1016 | print('{} = {}'.format(v,getattr(self,v))) |
---|
1017 | |
---|
1018 | |
---|
1019 | def main(*args,**kwargs): |
---|
1020 | gc.enable() |
---|
1021 | if DAEMON: |
---|
1022 | fp = open('/var/run/analyticsd.pid','w'); |
---|
1023 | fp.write(str(os.getpid())) |
---|
1024 | fp.close() |
---|
1025 | m = Monitor() |
---|
1026 | m.start() |
---|
1027 | printea("start done",'info') |
---|
1028 | while not m.terminate: |
---|
1029 | time.sleep(0.5) |
---|
1030 | m.end() |
---|
1031 | printea("Exitting...",'info') |
---|
1032 | |
---|
1033 | if __name__ == "__main__": |
---|
1034 | exit = 0 |
---|
1035 | keyword='analyticsd' |
---|
1036 | interpreter='python3' |
---|
1037 | for proc in psutil.process_iter(): |
---|
1038 | a=False |
---|
1039 | b=False |
---|
1040 | for argument in proc.cmdline(): |
---|
1041 | #print('{} {} {}'.format(proc.cmdline,keyword,argument[-len(keyword):])) |
---|
1042 | if interpreter in argument[-len(interpreter):]: |
---|
1043 | a = True |
---|
1044 | if keyword in argument[-len(keyword):]: |
---|
1045 | b = True |
---|
1046 | if a and b: |
---|
1047 | exit = exit +1 |
---|
1048 | if exit > 1: |
---|
1049 | printea('Another daemon is running','error') |
---|
1050 | sys.exit(1) |
---|
1051 | |
---|
1052 | lck = '/var/run/analyticsd' |
---|
1053 | if DAEMON: |
---|
1054 | if os.path.isfile(lck+'.lock'): |
---|
1055 | printea('Lockfile {} detected, unable to start'.format(lck),'error') |
---|
1056 | sys.exit(1) |
---|
1057 | else: |
---|
1058 | try: |
---|
1059 | with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]): |
---|
1060 | main() |
---|
1061 | except Exception as e: |
---|
1062 | printea(e) |
---|
1063 | sys.exit(1) |
---|
1064 | else: |
---|
1065 | main() |
---|