Ignore:
Timestamp:
Sep 25, 2017, 6:26:11 PM (3 years ago)
Author:
mabarracus
Message:

Fixed init script installation & update procedures
Syslog logging
Fixed reconnection when mysql is not available

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd

    r5560 r5926  
    1919import daemon
    2020import lockfile
    21 
     21import logging
     22import logging.handlers
     23from logging import config
     24
     25##### START EDITABLE VARS #####
     26
     27DEBUG=False
     28DAEMON=True
    2229FILE='/usr/lib/analytics-server/analytics/config.php'
     30LIMIT=5.0
     31
     32##### END EDITABLE VARS #####
     33
     34DEBUG_PRINT_ALIVE_COUNTER=10
     35THREADED=True
     36
     37if DEBUG:
     38    loglevel=logging.DEBUG
     39else:
     40    loglevel=logging.INFO
     41
     42LOGGING = {
     43    'version': 1,
     44    'disable_existing_loggers': False,
     45    'formatters': {
     46        'verbose': {
     47            'format': '%(levelname)s %(module)s %(message)s'
     48            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
     49            },
     50        },
     51    'handlers': {
     52        'stdout': {
     53            'class': 'logging.StreamHandler',
     54            'stream': sys.stdout,
     55            'formatter': 'verbose',
     56            },
     57        'sys-logger6': {
     58            'class': 'logging.handlers.SysLogHandler',
     59            'address': '/dev/log',
     60            'facility': "local6",
     61            'formatter': 'verbose',
     62            },
     63        },
     64    'loggers': {
     65        'analyticsd-logger': {
     66            'handlers': ['sys-logger6','stdout'],
     67            'level': loglevel,
     68            'propagate': True,
     69            },
     70        }
     71    }
     72
     73
     74def printea(msg="",level='critical'):
     75    if level == 'critical':
     76        logger.critical(msg)
     77    elif level == 'info':
     78        logger.info(msg)
     79    else:
     80        logger.debug(msg)
     81
     82def keepalive(who=""):
     83    global DEBUG_PRINT_ALIVE_COUNTER
     84    t=str(int(time.time()))
     85    if DEBUG:
     86        printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
     87    else:
     88        if who == 'main':
     89            if DEBUG_PRINT_ALIVE_COUNTER > 0:
     90                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
     91            else:
     92                DEBUG_PRINT_ALIVE_COUNTER=10
     93                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
     94
     95    fp = open('/var/run/analyticsd.keepalive','w');
     96    fp.write(t)
     97    fp.close()
     98
     99config.dictConfig(LOGGING)
     100logger = logging.getLogger('analyticsd-logger')
     101
    23102DBNAME=None
    24103USER=None
     
    26105IP=None
    27106
    28 DAEMON=True
    29 LIMIT=5.0
    30 DEBUG=0
    31 THREADED=True
     107
    32108EMPTY_PAUSED_SLEEP=10
    33109CHECK_LOAD_TIME=5
     
    55131                    PASS = PASS.group(1)
    56132    if not (IP or DBNAME or USER or PASS):
    57         print "Couldn't get database configuration from {}".format(FILE)
     133        printea("Couldn't get database configuration from {}".format(FILE))
    58134    else:
    59         if DEBUG:
    60             print "Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS);
     135        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
    61136except Exception as e:
    62     print "Couldn't parse {} Error:\"{}\"".format(FILE,str(e))
     137    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
    63138    sys.exit(1)
    64139
    65140if not (IP or DBNAME or USER or PASS):
    66     print "Couldn't get database configuration from {}".format(FILE)
     141    printea("Couldn't get database configuration from {}".format(FILE))
    67142    sys.exit(1)
    68143
     
    80155        self.q=Queue.Queue()
    81156        self.processed=0
     157        printea('Database worker {} initialized'.format(t),'info')
    82158
    83159    def timed(func):
     
    85161        def wrapper(*args,**kwargs):
    86162            if TIMED_ON:
    87                 print "Start({}): @{}".format(func.__name__,time.time())
     163                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
    88164            ret=func(*args,**kwargs)
    89165            if TIMED_ON:
    90                 print "End  ({}): @{}".format(func.__name__,time.time())
     166                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
    91167            return ret
    92168        return wrapper
     
    102178                return func(*args,**kwargs)
    103179            except Exception as e:
    104                 print "Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e))
     180                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
    105181                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
    106                     print "Fatal error in ({}), max retries exceded".format(func.__name__)
     182                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
    107183                    if kwargs['mon']:
    108184                        kwargs['mon'].term()
     
    119195        @wraps(func)
    120196        def wrapper(*args,**kwargs):
    121             if DEBUG and DEBUG > 2:
    122                 if 'query' in kwargs:
    123                     print "executing query: {}".format(kwargs['query'])
     197            if 'query' in kwargs:
     198                printea("executing query: {}".format(kwargs['query']),'debug')
    124199            return func(*args,**kwargs)
    125200        return wrapper
     
    128203    def execute(self,*args,**kwargs):
    129204        if 'query' not in kwargs:
    130             print "Warning execute called whithout query"
     205            printea("Warning execute called whithout query",'info')
    131206            return None
    132207        try:
     
    141216            self.cur = self.conn.cursor()
    142217            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
    143             if DEBUG:
    144                 print "Connected succesfully {}".format(self.t)
     218            printea("Connected succesfully {}".format(self.t),'info')
    145219
    146220        except mdb.Error, e:
    147             print "Error {}: {}".format(e.args[0],e.args[1])
     221            printea("Error {}: {}".format(e.args[0],e.args[1]))
    148222            raise Exception(e)
    149223
     
    151225        if self.conn:
    152226            self.conn.close()
    153         if DEBUG:
    154             print "Closed connection {}".format(self.t)
     227            printea("Closed connection {}".format(self.t),'info')
    155228
    156229    def reduce_flavour(self,version,flavour):
     
    204277    def get_apps(self,*args,**kwargs):
    205278        if 'clients' not in kwargs:
    206             print "Warning executed without named parameter clients"
     279            printea("Warning executed without named parameter clients",'info')
    207280            return None
    208281        ret = {}
     
    292365            self.execute(query=query)
    293366            pkg_size=self.cur.fetchone()[0]
    294             if DEBUG and DEBUG > 1:
    295                 print "Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty())
     367            printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
    296368            if ainc > 65500 and self.mon.all_queues_empty() and cli_size == 0 and pkg_size == 0:
    297369                query = "TRUNCATE TABLE tmp_clients"
     
    354426
    355427    def process(self,*args,**kwargs):
     428        keepalive(self.t)
    356429        # code only for main thread
    357         if DEBUG and DEBUG > 1:
    358             print "Running thread {}".format(self.t)
     430        printea("Running thread {}".format(self.t),'debug')
    359431        if self.t == 'main' and not self.mon.terminate:
    360432            ret=self.process_main_thread(*args,**kwargs)
     
    369441            ret=self.process_all_threads(*args,**kwargs)
    370442            if ret == False: # USELESS? THREADS was died
    371                 print "Error threads"
     443                printea("Error threads")
    372444                return ret
    373445        else:
     
    380452                if self.empty and self.t == 'main':
    381453                    self.reset_autoinc()
    382                 if DEBUG and DEBUG > 1 and self.mon.paused:
    383                     print "Paused by high load {}".format(self.t)
    384                 if DEBUG and DEBUG > 1 and self.empty:
    385                     print "Empty queue {} sleeping by now".format(self.t)
     454                if self.mon.paused:
     455                    printea("Paused by high load {}".format(self.t),'debug')
     456                if self.empty:
     457                    printea("Empty queue {} sleeping by now".format(self.t),'debug')
    386458                if self.empty:
    387459                    self.empty = False
    388460                time.sleep(EMPTY_PAUSED_SLEEP)
    389461            else:
    390                 self.conn.begin()
    391462                try:
     463                    self.conn.begin()
    392464                    if self.process():
    393465                        self.conn.commit()
     
    397469                        self.conn.rollback()
    398470                except Exception as e:
    399                     self.conn.rollback()
     471                    try:
     472                        self.conn.rollback()
     473                    except:
     474                        printea("Can't rollback last actions",'info')
     475                        pass
    400476                    if e[0] != 2006:
    401                         print "Exception processing: {}".format(e)
     477                        printea("Exception processing: {}".format(e))
    402478                    else:
    403479                        if self.reconnect == 100:
    404                             print "Lost connection to database"
     480                            printea("Lost connection to database")
    405481                            self.mon.term()
    406482                        else:
    407483                            self.reconnect+=1
    408                             time.sleep(self.reconnect*2)
     484                            printea('Reconnect to mysql({}) sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
     485                            time.sleep(self.reconnect*self.reconnect)
    409486                            try:
    410487                                self.init_db()
    411488                            except:
     489                                printea('Unable to initialize worker {}'.format(self.t))
    412490                                pass
    413491
     
    440518                self.db[x].init_db()
    441519            except Exception as e:
    442                 print 'Error initializing database connections: {}'.format(str(e))
     520                printea('Error initializing database connections: {}'.format(str(e)))
    443521                sys.exit(1)
     522        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
    444523
    445524    def schedule(self,*args,**kwargs):
     
    468547
    469548    def term(self,*args,**kwargs):
    470         print "Begin kill the program, wait please..."
     549        printea("Begin kill the program, wait please...",'info')
    471550        self.terminate=True
    472551
     
    516595                out+="TOTAL={} {}".format(total,out2)
    517596            if THREADED:
    518                 sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
     597                printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug')
     598                #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
    519599            else:
    520                 sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
     600                printea("{} {}SelectSize={} ProcessingAt={} Mem={}\n".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)),'debug')
     601                #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
    521602
    522603    def get_q_sizes(self):
     
    572653    m = Monitor()
    573654    m.start()
    574     print "start done"
     655    printea("start done",'info')
    575656    while not m.terminate:
    576657        time.sleep(0.5)
    577658    m.end()
    578     print "Exitting..."
     659    printea("Exitting...",'info')
    579660
    580661if __name__ == "__main__":
    581662    if DAEMON:
    582         with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analyticsd')):
     663        with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=002,pidfile=lockfile.FileLock('/var/run/analyticsd'),files_preserve=[logger.handlers[0].socket.fileno()]):
    583664            main()
    584665    else:
Note: See TracChangeset for help on using the changeset viewer.