Changeset 5926 for lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd
- Timestamp:
- Sep 25, 2017, 6:26:11 PM (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd
r5560 r5926 19 19 import daemon 20 20 import lockfile 21 21 import logging 22 import logging.handlers 23 from logging import config 24 25 ##### START EDITABLE VARS ##### 26 27 DEBUG=False 28 DAEMON=True 22 29 FILE='/usr/lib/analytics-server/analytics/config.php' 30 LIMIT=5.0 31 32 ##### END EDITABLE VARS ##### 33 34 DEBUG_PRINT_ALIVE_COUNTER=10 35 THREADED=True 36 37 if DEBUG: 38 loglevel=logging.DEBUG 39 else: 40 loglevel=logging.INFO 41 42 LOGGING = { 43 'version': 1, 44 'disable_existing_loggers': False, 45 'formatters': { 46 'verbose': { 47 'format': '%(levelname)s %(module)s %(message)s' 48 #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s' 49 }, 50 }, 51 'handlers': { 52 'stdout': { 53 'class': 'logging.StreamHandler', 54 'stream': sys.stdout, 55 'formatter': 'verbose', 56 }, 57 'sys-logger6': { 58 'class': 'logging.handlers.SysLogHandler', 59 'address': '/dev/log', 60 'facility': "local6", 61 'formatter': 'verbose', 62 }, 63 }, 64 'loggers': { 65 'analyticsd-logger': { 66 'handlers': ['sys-logger6','stdout'], 67 'level': loglevel, 68 'propagate': True, 69 }, 70 } 71 } 72 73 74 def printea(msg="",level='critical'): 75 if level == 'critical': 76 logger.critical(msg) 77 elif level == 'info': 78 logger.info(msg) 79 else: 80 logger.debug(msg) 81 82 def keepalive(who=""): 83 global DEBUG_PRINT_ALIVE_COUNTER 84 t=str(int(time.time())) 85 if DEBUG: 86 printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug') 87 else: 88 if who == 'main': 89 if DEBUG_PRINT_ALIVE_COUNTER > 0: 90 DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1 91 else: 92 DEBUG_PRINT_ALIVE_COUNTER=10 93 printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info') 94 95 fp = open('/var/run/analyticsd.keepalive','w'); 96 fp.write(t) 97 fp.close() 98 99 config.dictConfig(LOGGING) 100 logger = logging.getLogger('analyticsd-logger') 101 23 102 DBNAME=None 24 103 USER=None … … 26 105 IP=None 27 106 28 DAEMON=True 29 LIMIT=5.0 30 DEBUG=0 31 THREADED=True 107 32 108 EMPTY_PAUSED_SLEEP=10 33 109 CHECK_LOAD_TIME=5 … … 55 131 PASS = PASS.group(1) 56 132 if not (IP or DBNAME or USER or PASS): 57 print "Couldn't get database configuration from {}".format(FILE)133 printea("Couldn't get database configuration from {}".format(FILE)) 58 134 else: 59 if DEBUG: 60 print "Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS); 135 printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug') 61 136 except Exception as e: 62 print "Couldn't parse {} Error:\"{}\"".format(FILE,str(e))137 printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e))) 63 138 sys.exit(1) 64 139 65 140 if not (IP or DBNAME or USER or PASS): 66 print "Couldn't get database configuration from {}".format(FILE)141 printea("Couldn't get database configuration from {}".format(FILE)) 67 142 sys.exit(1) 68 143 … … 80 155 self.q=Queue.Queue() 81 156 self.processed=0 157 printea('Database worker {} initialized'.format(t),'info') 82 158 83 159 def timed(func): … … 85 161 def wrapper(*args,**kwargs): 86 162 if TIMED_ON: 87 print "Start({}): @{}".format(func.__name__,time.time())163 printea("Start({}): @{}".format(func.__name__,time.time()),'debug') 88 164 ret=func(*args,**kwargs) 89 165 if TIMED_ON: 90 print "End ({}): @{}".format(func.__name__,time.time())166 printea("End ({}): @{}".format(func.__name__,time.time()),'debug') 91 167 return ret 92 168 return wrapper … … 102 178 return func(*args,**kwargs) 103 179 except Exception as e: 104 print "Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e))180 printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e))) 105 181 if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate): 106 print "Fatal error in ({}), max retries exceded".format(func.__name__)182 printea("Fatal error in ({}), max retries exceded".format(func.__name__)) 107 183 if kwargs['mon']: 108 184 kwargs['mon'].term() … … 119 195 @wraps(func) 120 196 def wrapper(*args,**kwargs): 121 if DEBUG and DEBUG > 2: 122 if 'query' in kwargs: 123 print "executing query: {}".format(kwargs['query']) 197 if 'query' in kwargs: 198 printea("executing query: {}".format(kwargs['query']),'debug') 124 199 return func(*args,**kwargs) 125 200 return wrapper … … 128 203 def execute(self,*args,**kwargs): 129 204 if 'query' not in kwargs: 130 print "Warning execute called whithout query"205 printea("Warning execute called whithout query",'info') 131 206 return None 132 207 try: … … 141 216 self.cur = self.conn.cursor() 142 217 self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") 143 if DEBUG: 144 print "Connected succesfully {}".format(self.t) 218 printea("Connected succesfully {}".format(self.t),'info') 145 219 146 220 except mdb.Error, e: 147 print "Error {}: {}".format(e.args[0],e.args[1])221 printea("Error {}: {}".format(e.args[0],e.args[1])) 148 222 raise Exception(e) 149 223 … … 151 225 if self.conn: 152 226 self.conn.close() 153 if DEBUG: 154 print "Closed connection {}".format(self.t) 227 printea("Closed connection {}".format(self.t),'info') 155 228 156 229 def reduce_flavour(self,version,flavour): … … 204 277 def get_apps(self,*args,**kwargs): 205 278 if 'clients' not in kwargs: 206 print "Warning executed without named parameter clients"279 printea("Warning executed without named parameter clients",'info') 207 280 return None 208 281 ret = {} … … 292 365 self.execute(query=query) 293 366 pkg_size=self.cur.fetchone()[0] 294 if DEBUG and DEBUG > 1: 295 print "Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()) 367 printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug') 296 368 if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0: 297 369 query = "TRUNCATE TABLE tmp_clients" … … 354 426 355 427 def process(self,*args,**kwargs): 428 keepalive(self.t) 356 429 # code only for main thread 357 if DEBUG and DEBUG > 1: 358 print "Running thread {}".format(self.t) 430 printea("Running thread {}".format(self.t),'debug') 359 431 if self.t == 'main' and not self.mon.terminate: 360 432 ret=self.process_main_thread(*args,**kwargs) … … 369 441 ret=self.process_all_threads(*args,**kwargs) 370 442 if ret == False: # USELESS? THREADS was died 371 print "Error threads"443 printea("Error threads") 372 444 return ret 373 445 else: … … 380 452 if self.empty and self.t == 'main': 381 453 self.reset_autoinc() 382 if DEBUG and DEBUG > 1 andself.mon.paused:383 print "Paused by high load {}".format(self.t)384 if DEBUG and DEBUG > 1 andself.empty:385 print "Empty queue {} sleeping by now".format(self.t)454 if self.mon.paused: 455 printea("Paused by high load {}".format(self.t),'debug') 456 if self.empty: 457 printea("Empty queue {} sleeping by now".format(self.t),'debug') 386 458 if self.empty: 387 459 self.empty = False 388 460 time.sleep(EMPTY_PAUSED_SLEEP) 389 461 else: 390 self.conn.begin()391 462 try: 463 self.conn.begin() 392 464 if self.process(): 393 465 self.conn.commit() … … 397 469 self.conn.rollback() 398 470 except Exception as e: 399 self.conn.rollback() 471 try: 472 self.conn.rollback() 473 except: 474 printea("Can't rollback last actions",'info') 475 pass 400 476 if e[0] != 2006: 401 print "Exception processing: {}".format(e)477 printea("Exception processing: {}".format(e)) 402 478 else: 403 479 if self.reconnect == 100: 404 print "Lost connection to database"480 printea("Lost connection to database") 405 481 self.mon.term() 406 482 else: 407 483 self.reconnect+=1 408 time.sleep(self.reconnect*2) 484 printea('Reconnect to mysql({}) sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect)) 485 time.sleep(self.reconnect*self.reconnect) 409 486 try: 410 487 self.init_db() 411 488 except: 489 printea('Unable to initialize worker {}'.format(self.t)) 412 490 pass 413 491 … … 440 518 self.db[x].init_db() 441 519 except Exception as e: 442 print 'Error initializing database connections: {}'.format(str(e))520 printea('Error initializing database connections: {}'.format(str(e))) 443 521 sys.exit(1) 522 printea("Monitor initialized with {} threads".format(len(THREADS)),'info') 444 523 445 524 def schedule(self,*args,**kwargs): … … 468 547 469 548 def term(self,*args,**kwargs): 470 print "Begin kill the program, wait please..."549 printea("Begin kill the program, wait please...",'info') 471 550 self.terminate=True 472 551 … … 516 595 out+="TOTAL={} {}".format(total,out2) 517 596 if THREADED: 518 sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 597 printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug') 598 #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 519 599 else: 520 sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 600 printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug') 601 #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 521 602 522 603 def get_q_sizes(self): … … 572 653 m = Monitor() 573 654 m.start() 574 print "start done"655 printea("start done",'info') 575 656 while not m.terminate: 576 657 time.sleep(0.5) 577 658 m.end() 578 print "Exitting..."659 printea("Exitting...",'info') 579 660 580 661 if __name__ == "__main__": 581 662 if DAEMON: 582 with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analyticsd') ):663 with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]): 583 664 main() 584 665 else:
Note: See TracChangeset
for help on using the changeset viewer.