Changeset 6812


Ignore:
Timestamp:
Feb 14, 2018, 9:36:18 AM (19 months ago)
Author:
mabarracus
Message:

Some optimizations

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd

    r6775 r6812  
    1 #!/usr/bin/python3
     1#!/usr/bin/env python3
    22
    33import time
     
    55#from multiprocessing.dummy import Pool as ThreadPool
    66from multiprocessing.dummy import Process,Lock
    7 import multiprocessing
     7#import multiprocessing
    88import sys
    99import MySQLdb as mdb
     
    3737DEBUG_PRINT_ALIVE_COUNTER=60
    3838THREADED=True
     39MAX_SLOW_CHECK_CLIENTS = 30
    3940
    4041if DEBUG:
     
    9293    if level == 'critical':
    9394        logger.critical(msg)
     95    elif level == 'error':
     96        logger.error(msg)
     97    elif level == 'warning':
     98        logger.warning(msg)
    9499    elif level == 'info':
    95100        logger.info(msg)
     
    172177        self.q=queue.Queue()
    173178        self.processed=0
    174         self.need_wait = False
    175179        self.need_clean = False
    176180        printea('Database worker {} initialized'.format(t),'info')
     
    236240            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
    237241            variables=self.check_server_variables()
     242
    238243            if variables:
    239244                self.mon.server_variables=variables
     245
    240246            printea("Connected succesfully {}".format(self.t),'info')
    241247
     
    275281            print(e)
    276282
    277     def check_temporary_tables_size(self,table_name,is_tmp=False):
     283    def check_temporary_tables_size(self,table_name):
    278284        if self.t != 'load':
    279285            return None
     
    281287            size = 0
    282288            rows = 0
    283             if is_tmp:
    284                 self.cur.execute('select floor((data_length + index_length - data_free) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
    285             else:
    286                 self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
     289            self.cur.execute('select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
    287290
    288291            res=self.cur.fetchone()
     
    433436            self.execute(query=query)
    434437            ainc = self.cur.fetchone()[0]
    435             query = "SELECT count(*) from tmp_clients"
    436             self.execute(query=query)
    437             cli_size=self.cur.fetchone()[0]
    438             query = "SELECT count(*) from tmp_packages"
    439             self.execute(query=query)
    440             pkg_size=self.cur.fetchone()[0]
    441             printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
    442             if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0:
     438            #query = "SELECT count(*) from tmp_clients"
     439            #self.execute(query=query)
     440            #cli_size=self.cur.fetchone()[0]
     441            #query = "SELECT count(*) from tmp_packages"
     442            #self.execute(query=query)
     443            #pkg_size=self.cur.fetchone()[0]
     444            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
     445            if ainc > 65500 and self.mon.all_queues_empty():
     446            #and cli_size == 0 and pkg_size == 0:
    443447                query = "TRUNCATE TABLE tmp_clients"
    444448                self.execute(query=query)
     
    452456    @timed
    453457    def process_main_thread(self,*args,**kwargs):
    454         if self.need_wait:
    455             time.sleep(EMPTY_PAUSED_SLEEP)
     458        if self.mon.slow_check_clients > 0 :
     459            self.mon.slow_check_clients -= 1
     460            time.sleep(1)
     461            return True
     462        else:
     463            self.mon.slow_check_clients = self.mon.default_slow_check_clients
     464
    456465        clis=self.get_client(mon=self.mon)
    457466        if clis == True: #No clients found (empty)
     
    506515        printea("Running thread {}".format(self.t),'debug')
    507516        if self.t == 'main' and not self.mon.terminate:
    508            
    509517            ret=self.process_main_thread(*args,**kwargs)
    510518            if ret == False: #No more clients available
     
    530538        while not (self.mon.terminate and self.empty):
    531539            if self.mon.paused or self.empty:
    532                 if self.empty and self.t == 'main':
    533                     self.reset_autoinc()
     540                #if self.empty and self.t == 'main':
     541                #    self.reset_autoinc()
    534542                if self.mon.paused:
    535543                    printea("Paused by high load {}".format(self.t),'debug')
     
    542550                try:
    543551                    if self.conn == None:
    544                         raise Exception('Connection not available, probably it\'s a first run')
     552                        raise EnvironmentError('Connection not available, probably it\'s a first run')
    545553                    else:
    546554                        self.conn.begin()
     
    553561                    else:
    554562                        self.conn.rollback()
     563                except EnvironmentError as e:
     564                    printea(e)
     565                    self.init_db()
     566
    555567                except Exception as e:
    556568                    try:
     
    564576                    #if e[0] == 2006: # SERVER GONE AWAY
    565577                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
     578
    566579                    printea("Trying to recover connection ({})".format(self.t))
    567580                    if self.reconnect == 100:
     
    572585                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
    573586                        time.sleep(self.reconnect*self.reconnect)
     587
    574588                        try:
    575589                            self.init_db()
     
    583597    def __init__(self):
    584598        self.MAX_QUEUE_UTILIZATION = 100
    585         self.USE_MAX_QUEUES = False
     599        self.USE_MAX_QUEUES = True
    586600        self.MAX_SELECT_WINDOW = (2 ** 13) +1
    587601        self.MIN_SELECT_WINDOW = 32
     
    598612        self.procesed = 0
    599613        self.procesed_per_sec = [0]*10
    600         self.load = False
     614        self.load = 0
    601615        self.server_variables = None  # initialized by main worker
    602616        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
     
    604618        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
    605619        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
    606 
    607         self.server_mem = None
     620        self.default_slow_check_clients = 1
     621        self.slow_check_clients = self.default_slow_check_clients
     622        self.server_mem = 0
    608623        self.loadlist = [ 0.0 ] * 100
     624        self.max_heap = None
     625
    609626        signal.signal(signal.SIGQUIT,self.term)
    610627        signal.signal(signal.SIGTERM,self.term)
     
    623640        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
    624641
    625     def schedule(self,*args,**kwargs):
    626         if kwargs['event']=='NO_MORE_CLIENTS':
     642    def windowctl(self, *args, **kwargs):
     643        if args[0] == '+':
     644            if self.select_window*2 < self.MAX_SELECT_WINDOW:
     645                self.select_window*=2
     646                self.select_window=int(self.select_window)
     647        if args[0] == '-':
    627648            if self.select_window > self.MIN_SELECT_WINDOW:
    628649                self.select_window/=2
    629650                self.select_window=int(self.select_window)
    630         if kwargs['event']=='HAVE_CLIENTS':
     651
     652    def slowcheckctl(self, *args, **kwargs):
     653        if args[0] == 'reset':
     654            self.default_slow_check_clients = 0
     655        if args[0] == '+':
     656            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
     657                if self.default_slow_check_clients == 0:
     658                    self.default_slow_check_clients = 1
     659                else:
     660                    self.default_slow_check_clients = self.default_slow_check_clients * 2
     661
     662
     663    def schedule(self, *args, **kwargs):
     664        if kwargs['event'] == 'NO_MORE_CLIENTS':
     665            self.windowctl('-')
     666            self.slowcheckctl('+')
     667
     668        if kwargs['event'] == 'HAVE_CLIENTS':
     669
    631670            if kwargs['nclients'] == self.select_window:
    632                 if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
    633                     if self.select_window > self.MIN_SELECT_WINDOW:
    634                         self.select_window/=2
    635                         self.select_window=int(self.select_window)
    636                 if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
    637                     if self.select_window > self.MIN_SELECT_WINDOW:
    638                         self.select_window/=2
    639                         self.select_window=int(self.select_window)
    640                 else:
    641                     if self.select_window*2 < self.MAX_SELECT_WINDOW:
    642                         self.select_window*=2
    643                         self.select_window=int(self.select_window)
    644             elif kwargs['nclients']<self.select_window/2:
    645                 self.select_window/=2
    646                 self.select_window=int(self.select_window)
     671                self.windowctl('+')
     672            else:
     673                self.windowctl('-')
     674            self.slowcheckctl('reset')
     675#
     676#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
     677#                    self.windowctl('+')
     678#
     679#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
     680#                    self.windowctl('-')
     681#                    self.slowcheckctl('+')
     682#                else:
     683#                    self.windowctl('+')
     684#
     685#            elif kwargs['nclients'] < self.select_window/2:
     686#                self.windowctl('-')
     687#
     688#            self.slowcheckctl('reset')
    647689
    648690        if kwargs['event']=='CHECK_SANITY':
    649             #nprocs=multiprocessing.cpu_count()
    650             #limit=nprocs*LIMIT
    651 
    652             self.load=self.get_cpu_load()
     691            # CPU SETTINGS
    653692            if not self.terminate and self.load > LIMIT:
    654693                self.paused = True
     
    656695                self.paused = False
    657696
    658             self.MEM_USED=self.get_mem_usage()
    659             self.server_mem=self.get_server_free_mem()
    660 
    661             max_heap=None
    662             if self.server_variables and 'max_heap_table_size' in self.server_variables:
    663                 max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
    664             self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
    665             self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
    666             self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
    667             self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
    668             if max_heap and self.temporary_tables_size['sum']:
    669                 if self.temporary_tables_size['sum'] > max_heap * 0.4:
    670                     if self.select_window*2 < self.MAX_SELECT_WINDOW:
    671                         self.select_window *= 2
    672                         self.select_window=int(self.select_window)
     697            # DB MEM SETTINGS
     698            if self.max_heap and self.temporary_tables_size['sum']:
     699                if self.temporary_tables_size['sum'] > self.max_heap * 0.4:
     700                    self.windowctl('+')
    673701                    if self.paused:
    674702                        #printea('Hitting max temporary table size unpausing','critical')
    675703                        self.paused = False
     704            # SERVER MEM
    676705            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
    677706                #printea('Hitting max memory from server collecting and reducing window','critical')
    678                 if self.select_window > self.MIN_SELECT_WINDOW:
    679                     self.select_window/=2
    680                     self.select_window=int(self.select_window)
    681                 self.db['main'].need_wait=True
     707                self.windowctl('-')
     708                self.slowcheckctl('+')
    682709                self.USE_MAX_QUEUES=True
    683710                for x in THREADS:
     
    685712                gc.collect()
    686713            else:
    687                 self.db['main'].need_wait=False
    688                 self.USE_MAX_QUEUES=False
     714#                self.USE_MAX_QUEUES=False
    689715                for x in THREADS:
    690                     self.db[x].need_clean=True
    691 
    692     def get_config(self):
    693         pass
     716                    self.db[x].need_clean=False
     717
     718            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
     719                self.windowctl('-')
     720                self.slowcheckctl('+')
     721
     722            # QUEUES
     723            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
     724                self.windowctl('+')
     725
    694726
    695727    def get_cpu_load(self):
     
    808840        while not (self.terminate and self.finished): #and not self.paused
    809841            self.cfg.store('keepalive',int(time.time()))
    810             time.sleep(0.5)
    811             ctime+=0.5
     842            time.sleep(1)
     843            self.load=self.get_cpu_load()
     844            ctime+=1
    812845            self.schedule(event='CHECK_SANITY')
    813 
    814846            if ctime >CHECK_LOAD_TIME:
    815847                self.cfg.write()
    816848                ctime=0.0
    817                 self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients',True)
    818                 self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages',True)
     849
     850                self.MEM_USED=self.get_mem_usage()
     851                self.server_mem=self.get_server_free_mem()
     852
     853                if self.server_variables and 'max_heap_table_size' in self.server_variables:
     854                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
     855
     856                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size('tmp_clients')
     857                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size('tmp_packages')
    819858                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size('Client_Versions')
    820859                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size('RecvPackages')
     860                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
     861                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
     862                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
     863                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
     864                qempty = self.all_queues_empty()
     865                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
     866                    db.reset_autoinc()
    821867                self.print_stats()
    822868                #self.load=os.getloadavg()[0]
    823                 if self.all_queues_empty():
     869                if qempty:
    824870                    gc.collect()
    825871            #end if
     
    898944        printea("Another daemon running... exitting now!")
    899945        sys.exit(1)
     946    lck = '/var/run/analyticsd.lock'
    900947    if DAEMON:
    901         with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]):
    902             main()
     948        if os.path.isfile(lck):
     949            printea('Lockfile {} detected, unable to start'.format(lck),'error')
     950            sys.exit(1)
     951        else:
     952            try:
     953                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
     954                    main()
     955            except Exception as e:
     956                printea(e)
     957                sys.exit(1)
    903958    else:
    904959        main()
Note: See TracChangeset for help on using the changeset viewer.