Changeset 6767 for lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd
- Timestamp:
- Feb 5, 2018, 2:19:57 PM (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd
r5932 r6767 1 #!/usr/bin/python 1 #!/usr/bin/python3 2 2 3 3 import time … … 10 10 import signal 11 11 import os 12 import Queue12 import queue 13 13 import hashlib 14 14 import gc … … 22 22 import logging.handlers 23 23 from logging import config 24 from functools import reduce 25 import traceback 24 26 25 27 ##### START EDITABLE VARS ##### 26 28 27 29 DEBUG=False 30 MIN_LOG_LEVEL=logging.INFO 28 31 DAEMON=True 29 32 FILE='/usr/lib/analytics-server/analytics/config.php' 30 LIMIT= 5.033 LIMIT=70.0 # PERCENTAGE LOAD LIMIT 31 34 32 35 ##### END EDITABLE VARS ##### 33 36 34 DEBUG_PRINT_ALIVE_COUNTER= 1037 DEBUG_PRINT_ALIVE_COUNTER=60 35 38 THREADED=True 36 39 37 40 if DEBUG: 38 loglevel=logging.DEBUG 41 if MIN_LOG_LEVEL: 42 loglevel=MIN_LOG_LEVEL 43 else: 44 loglevel=logging.DEBUG 39 45 else: 40 46 loglevel=logging.INFO … … 71 77 } 72 78 79 def to_str(x): 80 if isinstance(x,str): 81 return to_utf(x).decode('unicode_escape') 82 else: 83 return x 84 85 def to_utf(x): 86 if isinstance(x,str): 87 return x.encode('utf-8') 88 else: 89 return x 73 90 74 91 def printea(msg="",level='critical'): … … 90 107 DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1 91 108 else: 92 DEBUG_PRINT_ALIVE_COUNTER= 10109 DEBUG_PRINT_ALIVE_COUNTER=60 93 110 printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info') 94 111 … … 105 122 IP=None 106 123 107 108 EMPTY_PAUSED_SLEEP=10 124 EMPTY_PAUSED_SLEEP=1 109 125 CHECK_LOAD_TIME=5 110 126 MAX_RETRIES=5 … … 150 166 def __init__(self,mon,t='main'): 151 167 self.t=t 152 self.reconnect= 1168 self.reconnect=0 153 169 self.empty = False 154 170 self.conn=None 155 171 self.mon=mon 156 self.q= Queue.Queue()172 self.q=queue.Queue() 157 173 self.processed=0 174 self.need_wait = False 175 self.need_clean = False 158 176 printea('Database worker {} initialized'.format(t),'info') 159 177 … … 217 235 self.cur = self.conn.cursor() 218 236 self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") 237 variables=self.check_server_variables() 238 if variables: 239 self.mon.server_variables=variables 219 240 printea("Connected succesfully {}".format(self.t),'info') 220 241 221 except mdb.Error ,e:242 except mdb.Error as e: 222 243 printea("Error {}: {}".format(e.args[0],e.args[1])) 223 244 raise Exception(e) 245 246 def check_server_variables(self): 247 if self.t != 'main': 248 return None 249 else: 250 printea('Getting server variables','info') 251 result = {} 252 self.cur.execute("show global variables") 253 printea('Setting {} vars'.format(self.cur.rowcount),'debug') 254 if self.cur.rowcount > 0: 255 for i in range(self.cur.rowcount): 256 var_name, var_value = self.cur.fetchone() 257 result.setdefault(var_name,var_value) 258 return result 259 260 def get_config(self): 261 values={} 262 self.cur.execute('select name,value from Config') 263 for val in self.cur.fetchall(): 264 values.setdefault(val[0],val[1]) 265 return values 266 267 def put_config(self,values): 268 vals=[] 269 for x in values.keys(): 270 vals.append("('{}','{}')".format(x,values[x])) 271 try: 272 self.cur.execute("insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals))) 273 self.conn.commit() 274 except Exception as e: 275 print(e) 276 277 def check_temporary_tables_size(self,table_name): 278 if self.t != 'load': 279 return None 280 else: 281 size = 0 282 rows = 0 283 self.cur.execute('select round(((data_length + index_length - data_free) / 1024 / 1024),2) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name)) 284 285 res=self.cur.fetchone() 286 size = float(res[0]) 287 rows = int(res[1]) 288 return (int(size),rows) 224 289 225 290 def close_db(self): … … 252 317 253 318 def gen_uuid(self,*args,**kwargs): 254 return int(hashlib.sha1('-'.join([str(x) for x in args])).hexdigest()[0:16],16) 319 try: 320 string=to_utf(u'-'.join([str(x) for x in args])) 321 h=hashlib.sha1(string) 322 return int(h.hexdigest()[0:16],16) 323 except Exception as e: 324 print(traceback.format_exc()) 255 325 256 326 @timed … … 258 328 def get_client(self,*args,**kwargs): 259 329 try: 260 query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format( self.mon.select_window)330 query="SELECT id,date,user,version,sabor from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window)) 261 331 self.execute(query=query) 262 332 ret =[] … … 357 427 def reset_autoinc(self): 358 428 try: 359 query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_ clients'".format(DBNAME)429 query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME) 360 430 self.execute(query=query) 361 431 ainc = self.cur.fetchone()[0] … … 379 449 @timed 380 450 def process_main_thread(self,*args,**kwargs): 451 if self.need_wait: 452 time.sleep(EMPTY_PAUSED_SLEEP) 381 453 clis=self.get_client(mon=self.mon) 382 454 if clis == True: #No clients found (empty) … … 400 472 lapps_tmp['clients'].extend(lapps[fla]['clients']) 401 473 self.mon.db['main'].q.put(lapps_tmp,True) 402 if DEBUG:474 #if DEBUG: 403 475 self.processed+=len(clis) 404 476 return True … … 422 494 self.q.put(lapps,True) # USELESS 423 495 return False # USELESS 424 if DEBUG:496 #if DEBUG: 425 497 self.processed+=len(lapps['clients']) 426 498 return True … … 431 503 printea("Running thread {}".format(self.t),'debug') 432 504 if self.t == 'main' and not self.mon.terminate: 505 433 506 ret=self.process_main_thread(*args,**kwargs) 434 507 if ret == False: #No more clients available … … 445 518 return ret 446 519 else: 520 del self.q 521 self.q = queue.Queue() 447 522 self.empty = True 448 523 return True … … 471 546 self.mon.commited=time.time() 472 547 self.reconnect = 0 548 if self.need_clean: 549 gc.collect() 473 550 else: 474 551 self.conn.rollback() … … 480 557 printea("Can't rollback last actions",'info') 481 558 pass 482 if e[0] != 2006:483 printea("Exception processing worker({}): {}".format(self.t,e))484 if e[0] == 2006: # SERVER GONE AWAY485 printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))559 #if e[0] != 2006: 560 # printea("Exception processing worker({}): {}".format(self.t,e)) 561 #if e[0] == 2006: # SERVER GONE AWAY 562 # printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e)) 486 563 printea("Trying to recover connection ({})".format(self.t)) 487 564 if self.reconnect == 100: … … 504 581 self.MAX_QUEUE_UTILIZATION = 100 505 582 self.USE_MAX_QUEUES = False 506 self.MAX_SELECT_WINDOW = (2 ** 12) +1 583 self.MAX_SELECT_WINDOW = (2 ** 13) +1 584 self.MIN_SELECT_WINDOW = 32 507 585 self.MEM_USED=0 508 586 self.MAX_MEM=512 587 self.MIN_FREE_MEM_SERVER=100 509 588 self.USE_MAX_MEM=True 510 589 self.lock = Lock() … … 512 591 self.finished = False 513 592 self.paused = False 514 self.select_window = 1593 self.select_window = self.MIN_SELECT_WINDOW 515 594 self.commited = time.time() 516 595 self.procesed = 0 517 596 self.procesed_per_sec = [0]*10 597 self.load = False 598 self.server_variables = None # initialized by main worker 599 self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } 600 self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } 601 self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 } 602 self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 } 603 604 self.server_mem = None 605 self.loadlist = [ 0.0 ] * 100 518 606 signal.signal(signal.SIGQUIT,self.term) 519 607 signal.signal(signal.SIGTERM,self.term) … … 529 617 # printea('Error initializing database connections: {}'.format(str(e))) 530 618 # sys.exit(1) 619 self.cfg= None 531 620 printea("Monitor initialized with {} threads".format(len(THREADS)),'info') 532 621 533 622 def schedule(self,*args,**kwargs): 534 623 if kwargs['event']=='NO_MORE_CLIENTS': 535 if self.select_window > 1:624 if self.select_window > self.MIN_SELECT_WINDOW: 536 625 self.select_window/=2 626 self.select_window=int(self.select_window) 537 627 if kwargs['event']=='HAVE_CLIENTS': 538 628 if kwargs['nclients'] == self.select_window: 539 if self.USE_MAX_QUEUES 540 if self.select_window > 1:629 if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION: 630 if self.select_window > self.MIN_SELECT_WINDOW: 541 631 self.select_window/=2 632 self.select_window=int(self.select_window) 542 633 if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM: 543 if self.select_window > 1:634 if self.select_window > self.MIN_SELECT_WINDOW: 544 635 self.select_window/=2 636 self.select_window=int(self.select_window) 545 637 else: 546 638 if self.select_window*2 < self.MAX_SELECT_WINDOW: 547 639 self.select_window*=2 640 self.select_window=int(self.select_window) 548 641 elif kwargs['nclients']<self.select_window/2: 549 642 self.select_window/=2 643 self.select_window=int(self.select_window) 644 645 if kwargs['event']=='CHECK_SANITY': 646 #nprocs=multiprocessing.cpu_count() 647 #limit=nprocs*LIMIT 648 649 self.load=self.get_cpu_load() 650 if not self.terminate and self.load > LIMIT: 651 self.paused = True 652 else: 653 self.paused = False 654 655 self.MEM_USED=self.get_mem_usage() 656 self.server_mem=self.get_server_free_mem() 657 658 max_heap=None 659 if self.server_variables and 'max_heap_table_size' in self.server_variables: 660 max_heap=int(self.server_variables['max_heap_table_size'])/(2**20) 661 self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages'] 662 self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages'] 663 self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages'] 664 self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages'] 665 if max_heap and self.temporary_tables_size['sum']: 666 if self.temporary_tables_size['sum'] > max_heap * 0.4: 667 if self.select_window*2 < self.MAX_SELECT_WINDOW: 668 self.select_window *= 2 669 self.select_window=int(self.select_window) 670 if self.paused: 671 #printea('Hitting max temporary table size unpausing','critical') 672 self.paused = False 673 if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER: 674 #printea('Hitting max memory from server collecting and reducing window','critical') 675 if self.select_window > self.MIN_SELECT_WINDOW: 676 self.select_window/=2 677 self.select_window=int(self.select_window) 678 self.db['main'].need_wait=True 679 self.USE_MAX_QUEUES=True 680 for x in THREADS: 681 self.db[x].need_clean=True 682 gc.collect() 683 else: 684 self.db['main'].need_wait=False 685 self.USE_MAX_QUEUES=False 686 for x in THREADS: 687 self.db[x].need_clean=True 550 688 551 689 def get_config(self): 552 690 pass 691 692 def get_cpu_load(self): 693 self.loadlist.append(psutil.cpu_percent()) 694 self.loadlist=self.loadlist[1:] 695 avg=0.0 696 for x in self.loadlist: 697 avg+=x 698 return round(avg/100.0,2) 553 699 554 700 def put_config(self,key='',value=''): … … 572 718 def get_mem_usage(self): 573 719 process = psutil.Process(os.getpid()) 574 mem = process. get_memory_info()[0] / float(2 ** 20)720 mem = process.memory_info()[0] / float(2 ** 20) 575 721 return mem 722 723 def get_server_free_mem(self): 724 mem = psutil.virtual_memory() 725 return mem.free / (2 ** 20) 576 726 577 727 def print_stats(self): 578 728 global CHECK_LOAD_TIME 579 if DEBUG: 729 # if DEBUG: 730 if True: 580 731 out="Processed: " 581 732 out2='' … … 583 734 for x in THREADS: 584 735 out2+='{}={} '.format(x,self.db[x].processed) 736 self.cfg.store('processed '+x,self.db[x].processed) 585 737 if THREADED: 586 738 if x != 'main': … … 589 741 total += self.db[x].processed 590 742 out += "TOTAL={} {}".format(total,out2) 591 self.procesed_per_sec.append(int((total-self.procesed)/CHECK_LOAD_TIME)) 743 self.cfg.store('processed total',total) 744 proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME) 745 self.procesed_per_sec.append(proc_per_sec) 592 746 self.procesed_per_sec=self.procesed_per_sec[-10:] 593 747 f=lambda x,y:x+y 594 748 suma_proc=reduce(f,self.procesed_per_sec) 749 self.cfg.store('processing at',int(suma_proc/10)) 595 750 self.procesed=total 596 751 total = 0 … … 600 755 out2='' 601 756 for x in sizes: 757 self.cfg.store('queue '+x,sizes[x]) 602 758 out2+='{}={} '.format(x,sizes[x]) 603 759 total += sizes[x] 760 self.cfg.store('queue totals',total) 604 761 out+="TOTAL={} {}".format(total,out2) 605 if THREADED: 606 printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug') 607 #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 762 self.cfg.store('select window',self.select_window) 763 self.cfg.store('mem used',self.MEM_USED) 764 self.cfg.store('load',self.load) 765 if (self.paused): 766 self.cfg.store('paused',1) 608 767 else: 609 printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug') 610 #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 768 self.cfg.store('paused',0) 769 self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients'])) 770 self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages'])) 771 self.cfg.store('db_clients_size',int(self.db_tables_size['clients'])) 772 self.cfg.store('db_packages_size',int(self.db_tables_size['packages'])) 773 if DEBUG: 774 if THREADED: 775 printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') 776 #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 777 else: 778 printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info') 779 #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED))) 611 780 612 781 def get_q_sizes(self): … … 629 798 630 799 def get_load(self): 631 nprocs=multiprocessing.cpu_count() 632 limit=nprocs*LIMIT 633 while not (self.terminate and not self.paused and self.finished): 634 self.print_stats() 635 self.load=os.getloadavg()[0] 636 if not self.terminate and self.load > limit: 637 self.paused = True 638 else: 639 self.paused = False 640 if self.all_queues_empty(): 641 gc.collect() 642 self.MEM_USED=self.get_mem_usage() 643 time.sleep(CHECK_LOAD_TIME) 800 # runs on separate thread 801 db = DB(self,'load') 802 db.init_db() 803 self.cfg = Config(db) 804 ctime=0 805 while not (self.terminate and self.finished): #and not self.paused 806 self.cfg.store('keepalive',int(time.time())) 807 time.sleep(0.5) 808 ctime+=0.5 809 self.schedule(event='CHECK_SANITY') 810 811 if ctime >CHECK_LOAD_TIME: 812 self.cfg.write() 813 ctime=0.0 814 self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients') 815 self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages') 816 self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions') 817 self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages') 818 self.print_stats() 819 #self.load=os.getloadavg()[0] 820 if self.all_queues_empty(): 821 gc.collect() 822 #end if 823 #end while 824 db.close_db() 644 825 645 826 def start(self): … … 652 833 self.finished = True 653 834 self.print_stats() 835 836 837 class Config(): 838 def __init__(self,connection): 839 self._db = connection 840 self.read() 841 842 def store(self,var,value): 843 var=var.replace(' ','_') 844 if isinstance(value,str): 845 if value.isnumeric(): 846 setattr(self,var,int(value)) 847 else: 848 setattr(self,var,str(value)) 849 else: 850 setattr(self,var,int(value)) 851 852 def write(self): 853 if self._db: 854 values={} 855 for x in self.get_internal_vars(): 856 values.setdefault(str(x),str(getattr(self,x))) 857 self._db.put_config(values) 858 859 def read(self): 860 if self._db: 861 config=self._db.get_config() 862 for key in config.keys(): 863 if config[key].isnumeric(): 864 setattr(self,key,int(config[key])) 865 else: 866 setattr(self,key,config[key]) 867 else: 868 print('No config yet') 869 870 def get_internal_vars(self): 871 return list(filter(lambda x : x[0] != '_',self.__dict__.keys())) 872 873 def print(self): 874 for v in self.get_internal_vars(): 875 print('{} = {}'.format(v,getattr(self,v))) 654 876 655 877 … … 669 891 670 892 if __name__ == "__main__": 893 size=len([ p.name() for p in psutil.process_iter() if p.name() == 'analyticsd' ]) 894 if size > 1: 895 printea("Another daemon running... exitting now!") 896 sys.exit(1) 671 897 if DAEMON: 672 with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0 02,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]):898 with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]): 673 899 main() 674 900 else:
Note: See TracChangeset
for help on using the changeset viewer.