source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 7154

Last change on this file since 7154 was 7154, checked in by mabarracus, 19 months ago

Disable debugging flags

  • Property svn:executable set to *
File size: 41.4 KB
Line 
1#!/usr/bin/env python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7#import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34WARNING_MIN_HEAP = 256
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39MAX_SLOW_CHECK_CLIENTS = 30
40USE_FILE_EVENT_LOG=False
41USE_FILE_EVENT_DB=False
42USE_CONFIG_FILE_LOG=False
43CONFIG_FILE_LOG='/var/run/analyticsd_config.log'
44FILE_EVENT_LOG='/var/run/analyticsd_event.log'
45
46if DEBUG:
47    if MIN_LOG_LEVEL:
48        loglevel=MIN_LOG_LEVEL
49    else:
50        loglevel=logging.DEBUG
51else:
52    loglevel=logging.INFO
53
54LOGGING = {
55    'version': 1,
56    'disable_existing_loggers': False,
57    'formatters': {
58        'verbose': {
59            'format': '%(levelname)s %(module)s %(message)s'
60            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
61            },
62        },
63    'handlers': {
64        'stdout': {
65            'class': 'logging.StreamHandler',
66            'stream': sys.stdout,
67            'formatter': 'verbose',
68            },
69        'sys-logger6': {
70            'class': 'logging.handlers.SysLogHandler',
71            'address': '/dev/log',
72            'facility': "local6",
73            'formatter': 'verbose',
74            },
75        },
76    'loggers': {
77        'analyticsd-logger': {
78            'handlers': ['sys-logger6','stdout'],
79            'level': loglevel,
80            'propagate': True,
81            },
82        }
83    }
84
85def to_str(x):
86    if isinstance(x,str):
87        return to_utf(x).decode('unicode_escape')
88    else:
89        return x
90
91def to_utf(x):
92    if isinstance(x,str):
93        return x.encode('utf-8')
94    else:
95        return x
96
97def printea(msg="",level='critical'):
98    if level == 'critical':
99        logger.critical(msg)
100    elif level == 'error':
101        logger.error(msg)
102    elif level == 'warning':
103        logger.warning(msg)
104    elif level == 'info':
105        logger.info(msg)
106    else:
107        logger.debug(msg)
108
109def keepalive(who=""):
110    global DEBUG_PRINT_ALIVE_COUNTER
111    t=str(int(time.time()))
112    if DEBUG:
113        pass
114        # too much verbose
115        # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
116    else:
117        if who == 'main':
118            if DEBUG_PRINT_ALIVE_COUNTER > 0:
119                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
120            else:
121                DEBUG_PRINT_ALIVE_COUNTER=60
122                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
123
124    fp = open('/var/run/analyticsd.keepalive','w');
125    fp.write(t)
126    fp.close()
127
128config.dictConfig(LOGGING)
129logger = logging.getLogger('analyticsd-logger')
130
131DBNAME=None
132USER=None
133PASS=None
134IP=None
135
136EMPTY_PAUSED_SLEEP=1
137CHECK_LOAD_TIME=5
138MAX_RETRIES=5
139TIMED_ON=False
140
141try:
142    with open(FILE,'r') as f:
143        for line in f:
144            if not IP:
145                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
146                if IP:
147                    IP = IP.group(1)
148            if not DBNAME:
149                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
150                if DBNAME:
151                    DBNAME = DBNAME.group(1)
152            if not USER:
153                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
154                if USER:
155                    USER = USER.group(1)
156            if not PASS:
157                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
158                if PASS:
159                    PASS = PASS.group(1)
160    if not (IP or DBNAME or USER or PASS):
161        printea("Couldn't get database configuration from {}".format(FILE))
162    else:
163        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
164except Exception as e:
165    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
166    sys.exit(1)
167
168if not (IP or DBNAME or USER or PASS):
169    printea("Couldn't get database configuration from {}".format(FILE))
170    sys.exit(1)
171
172if THREADED:
173    THREADS=['main','server','client','desktop','other']
174else:
175    THREADS=['main']
176
177class DB():
178    def __init__(self,mon,t='main'):
179        self._initialized=False
180        self.t=t
181        self.reconnect=0
182        self.empty = False
183        self.conn=None
184        self.mon=mon
185        self.q=queue.Queue()
186        self.processed=0
187        self.need_clean = False
188        printea('Database worker {} initialized'.format(t),'info')
189
190    def timed(func):
191        @wraps(func)
192        def wrapper(*args,**kwargs):
193            if TIMED_ON:
194                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
195            ret=func(*args,**kwargs)
196            if TIMED_ON:
197                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
198            return ret
199        return wrapper
200
201    def with_retry(func):
202        @wraps(func)
203        def wrapper(*args,**kwargs):
204            if 'retry' not in kwargs:
205                kwargs['retry']=1
206            if 'mon' not in kwargs:
207                kwargs['mon'] = None
208            try:
209                return func(*args,**kwargs) 
210            except Exception as e:
211                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
212                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
213                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
214                    if kwargs['mon']:
215                        kwargs['mon'].term()
216                        sys.exit(1)
217                    return None
218                else:
219                    time.sleep(kwargs['retry']**2)
220                    kwargs['retry']+=1
221                    return wrapper(*args,**kwargs)
222            return result
223        return wrapper
224
225    def with_debug(func):
226        @wraps(func)
227        def wrapper(*args,**kwargs):
228            if 'query' in kwargs:
229                printea("executing query: {}".format(kwargs['query']),'debug')
230            return func(*args,**kwargs)
231        return wrapper
232
233    @with_debug
234    def execute(self,*args,**kwargs):
235        if 'query' not in kwargs:
236            printea("Warning execute called whithout query",'info')
237            return None
238        try:
239            return self.cur.execute(kwargs['query'])
240        except mdb.OperationalError as e:
241                printea('({}) Operational error on mysql, error is: {}'.format(self.t,e),'error')
242                self.init_db()
243        except Exception as e:
244                raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
245
246    def init_db(self):
247        try:
248            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
249            self.conn.autocommit(False)
250            self.cur = self.conn.cursor()
251            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
252            variables=self.check_server_variables()
253
254            if variables:
255                self.mon.server_variables=variables
256
257            printea("Connected succesfully {} thread_id={}".format(self.t,self.conn.thread_id()),'info')
258
259        except mdb.Error as e:
260            printea("Error {}: {}".format(e.args[0],e.args[1]))
261            raise Exception(e)
262
263    def check_server_variables(self):
264        if self.t != 'main':
265            return None
266        else:
267            printea('Getting server variables','info')
268            result = {}
269            self.cur.execute("show global variables")
270            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
271            if self.cur.rowcount > 0:
272                for i in range(self.cur.rowcount):
273                    var_name, var_value = self.cur.fetchone()
274                    result.setdefault(var_name,var_value)
275            return result
276
277    def get_config(self):
278        values={}
279        self.cur.execute('select name,value from Config')
280        for val in self.cur.fetchall():
281            values.setdefault(val[0],val[1])
282        return values
283
284    @with_retry
285    def put_config(self,*args,**kwargs):
286        values = kwargs.get('values')
287        vals=[]
288        for x in values.keys():
289            vals.append("('{}','{}')".format(x,values[x]))
290        try:
291            self.execute(query="insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
292            self.conn.commit()
293        except Exception as e:
294            printea(e,'error')
295
296    @with_retry
297    def check_temporary_tables_size(self,*args,**kwargs):
298        table_name=kwargs.get('table_name')
299        if self.t != 'load':
300            return None
301        else:
302            size = 0
303            rows = 0
304            self.execute(query='select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
305
306            res=self.cur.fetchone()
307            size = float(res[0])
308            rows = int(res[1])
309            return (int(size),rows)
310
311    def close_db(self):
312        if self.conn:
313            self.conn.close()
314            printea("Closed connection {}".format(self.t),'info')
315
316    def reduce_flavour(self,version,flavour):
317        if version == '15':
318            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
319                return 'desktop'
320            elif 'server' in flavour:
321                return 'server'
322            elif 'client' in flavour:
323                return 'client' 
324        elif version == '16':
325            if 'server' in flavour:
326                return 'server'
327            elif 'client' in flavour:
328                return 'client'
329            elif 'desktop' in flavour:
330                return 'desktop'
331        return 'other'
332
333    def reduce_version(self,version):
334        if version[0:2] in ['15','16']:
335            return version[0:2]
336        else:
337            return 'other'
338
339    def gen_uuid(self,*args,**kwargs):
340        try:
341            string=to_utf(u'-'.join([str(x) for x in args]))
342            h=hashlib.sha1(string)
343            return int(h.hexdigest()[0:16],16)
344        except Exception as e:
345            printea(traceback.format_exc(),'critical')
346
347    @timed
348    @with_retry
349    def get_client(self,*args,**kwargs):
350        try:
351            query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu,ltsp,mode from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
352            self.execute(query=query)
353            ret =[]
354            if self.cur.rowcount > 0:
355                for i in range(self.cur.rowcount):
356                    v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu,v_ltsp,v_mode=self.cur.fetchone()
357                    version=self.reduce_version(v_version)
358                    flavour=self.reduce_flavour(version,v_flavour)
359                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
360                    if not v_arch:
361                        v_arch = 'NULL'
362                    if not v_mem:
363                        v_mem = 'NULL'
364                    if not v_vga:
365                        v_vga = 'NULL'
366                    if not v_cpu:
367                        v_cpu = 'NULL'
368                    if not v_ncpu:
369                        v_ncpu = 'NULL'
370                    if v_ltsp == '1' or v_ltsp == True:
371                        v_ltsp = 'TRUE'
372                    elif v_ltsp == '0' or v_ltsp == False:
373                        v_ltsp = 'FALSE'
374                    else:
375                        v_ltsp = 'NULL'
376                    if not v_mode:
377                        v_mode='NULL'
378                    else:
379                        v_mode="'"+v_mode+"'"
380
381                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu,'ltsp':v_ltsp,'mode':v_mode})
382                return ret
383            else:
384                return True
385        except Exception as e:
386            raise Exception("Error getting client: {}".format(e))
387
388    @timed
389    @with_retry
390    def get_apps(self,*args,**kwargs):
391        if 'clients' not in kwargs:
392            printea("Warning executed without named parameter clients",'info')
393            return None
394        ret = {}
395        try:
396            cids_to_rel_fla={}
397            for c in kwargs['clients']:
398                if c['id'] not in cids_to_rel_fla:
399                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
400                if c['flavour'] not in ret:
401                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
402            for c in kwargs['clients']:
403                ret[c['flavour']]['clients'].append(c)
404
405            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
406            self.execute(query=query)
407            if self.cur.rowcount > 0:
408                for i in range(self.cur.rowcount):
409                    row = self.cur.fetchone()
410                    clid = row[0]
411                    rel,fla = cids_to_rel_fla[clid]
412                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
413                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
414            return ret
415        except Exception as e:
416            raise Exception("Error getting apps: {}".format(e))
417
418    @timed
419    @with_retry
420    def put_client(self,*args,**kwargs):
421        if 'client_list' not in kwargs:
422            raise Exception("Called without named parameter client_list")
423        values= []
424        for cli in kwargs['client_list']:
425            values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{},{},{})".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu'],cli['ltsp'],cli['mode']))
426        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`,`ltsp`,`mode`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
427        self.execute(query=query)
428        return True
429
430    @timed
431    @with_retry
432    def put_apps(self,*args,**kwargs):
433        if 'apps' not in kwargs:
434            raise Exception("Called without named parameter apps")
435        app_list = {}
436        for app in kwargs['apps']:
437            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
438            if str(app['uuid']) not in app_list:
439                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
440            else:
441                app_list[str(app['uuid'])]['value']+=app['value']
442        values = []
443        for app in app_list:
444            item=app_list[app]
445            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
446        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
447        self.execute(query=query)
448        return True
449
450    @timed
451    @with_retry
452    def del_client(self,*args,**kwargs):
453        if 'client_list' not in kwargs:
454            raise Exception("Called without named parameter client_list")
455        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
456        self.execute(query=query)
457        return True
458
459    @timed
460    @with_retry
461    def del_apps(self,*args,**kwargs):
462        if 'client_list' not in kwargs:
463            raise Exception("Called without named parameter client_list")
464        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
465        self.execute(query=query)
466        return True
467
468    @timed
469    def reset_autoinc(self):
470        try:
471            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
472            self.execute(query=query)
473            ainc = self.cur.fetchone()[0]
474            #query = "SELECT count(*) from tmp_clients"
475            #self.execute(query=query)
476            #cli_size=self.cur.fetchone()[0]
477            #query = "SELECT count(*) from tmp_packages"
478            #self.execute(query=query)
479            #pkg_size=self.cur.fetchone()[0]
480            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
481            if ainc > 65500 and self.mon.all_queues_empty():
482            #and cli_size == 0 and pkg_size == 0:
483                query = "TRUNCATE TABLE tmp_clients"
484                self.execute(query=query)
485                query = "TRUNCATE TABLE tmp_packages"
486                self.execute(query=query)
487            return True
488        except Exception as e:
489            raise Exception("Error reseting auto_increment: {}".format(e))
490
491
492    @timed
493    def process_main_thread(self,*args,**kwargs):
494        if self.mon.slow_check_clients > 0 :
495            self.mon.slow_check_clients -= 1
496            time.sleep(1)
497            return True
498        else:
499            self.mon.slow_check_clients = self.mon.default_slow_check_clients
500
501        clis=self.get_client(mon=self.mon)
502        if clis == True: #No clients found (empty)
503            self.empty = True # if > 65500 auto_increment was reset
504            self.mon.schedule(event='NO_MORE_CLIENTS')
505            return False
506        #clients found
507        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
508        # if clients found get apps
509        lapps = self.get_apps(clients=clis,mon=self.mon)
510        #lapps can be empty
511        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
512        #If deletion was failed , thread was died
513        lapps_tmp={'apps':[],'clients':[]}
514        for fla in lapps:
515            if THREADED:
516                lapps[fla]['timestamp']=time.time()
517                self.mon.db[fla].q.put(lapps[fla],True)
518            else:
519                lapps_tmp['apps'].extend(lapps[fla]['apps'])
520                lapps_tmp['clients'].extend(lapps[fla]['clients'])
521                self.mon.db['main'].q.put(lapps_tmp,True)
522        #if DEBUG:
523        self.processed+=len(clis)
524        return True
525
526    @timed
527    def process_all_threads(self,*args,**kwargs):
528        lapps=self.q.get(True)
529        #print "Running {}".format(self.t)
530        if THREADED:
531            while (lapps['timestamp'] > self.mon.commited):
532                time.sleep(0.001)
533        if len(lapps['clients']) != 0:
534            printea('Thread {} putting client'.format(self.t),'debug')
535            #IF FAIL, AFTER RETRIES THREAD DIES
536            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
537                self.q.put(lapps,True) # USELESS
538                return False    # USELESS
539        if len(lapps['apps']) != 0:
540            printea('Thread {} putting clientapps'.format(self.t),'debug')
541            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
542                self.q.put(lapps,True) # USELESS
543                return False    # USELESS
544            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
545                self.q.put(lapps,True) # USELESS
546                return False    # USELESS
547        #if DEBUG:
548        self.processed+=len(lapps['clients'])
549        return True
550
551    def process(self,*args,**kwargs):
552        keepalive(self.t)
553        # warning too much verbose, printea("Running thread {}".format(self.t),'debug')
554        if self.t == 'main' and not self.mon.terminate:
555            ret=self.process_main_thread(*args,**kwargs)
556            if ret == False: #No more clients available
557                return True
558            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
559                if THREADED:
560                    return True #Need return to avoid put empty flag and wait into main thread
561
562        #after this poing, code for all threads
563        if not self.q.empty():
564            ret=self.process_all_threads(*args,**kwargs)
565            if ret == False: # USELESS? THREADS was died
566                printea("Error threads")
567                return ret
568        else: 
569            del self.q
570            self.q = queue.Queue()
571            self.empty = True
572        return True
573
574    def worker(self):
575        printea("Starting worker {} processing".format(self.t),'info')
576        while not (self.mon.terminate and self.empty):
577            if self.mon.paused or self.empty:
578                #if self.empty and self.t == 'main':
579                #    self.reset_autoinc()
580                if self.mon.paused:
581                    printea("Paused by high load {}".format(self.t),'debug')
582                # Too much verbose
583                #if self.empty:
584                #    printea("Empty queue {} sleeping by now".format(self.t),'debug')
585                if self.empty:
586                    self.empty = False
587                time.sleep(EMPTY_PAUSED_SLEEP)
588            else:
589                try:
590                    if self.conn == None:
591                        if not self._initialized:
592                            self._initialized = True
593                            raise EnvironmentError('Initializing connector {}'.format(self.t))
594                        else:
595                            raise Warning('Connection not available')
596                    else:
597                        self.conn.begin()
598                    if self.process():
599                        self.conn.commit()
600                        self.mon.commited=time.time()
601                        self.reconnect = 0
602                        if self.need_clean:
603                            gc.collect()
604                    else:
605                        self.conn.rollback()
606                except EnvironmentError as e:
607                    printea(e,'info')
608                    self.init_db()
609                except Warning as e:
610                    printea(e,'warning')
611                    self.init_db()
612                except Exception as e:
613                    try:
614                        if self.conn != None:
615                            self.conn.rollback()
616                    except:
617                        printea("Can't rollback last actions",'info')
618                        pass
619                    #if e[0] != 2006:
620                    #    printea("Exception processing worker({}): {}".format(self.t,e))
621                    #if e[0] == 2006: # SERVER GONE AWAY
622                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
623
624                    printea("Trying to recover connection ({}), {}".format(self.t,e))
625                    if self.reconnect == 100:
626                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
627                        self.mon.term()
628                    else:
629                        self.reconnect+=1
630                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
631                        time.sleep(self.reconnect*self.reconnect)
632
633                        try:
634                            self.init_db()
635                            printea("Recovered worker {} connection".format(self.t),'info')
636                        except:
637                            printea('Unable to initialize worker {}'.format(self.t))
638                            pass
639
640
641class Monitor():
642    def __init__(self):
643        self.MAX_QUEUE_UTILIZATION = 100
644        self.USE_MAX_QUEUES = True
645        self.MAX_SELECT_WINDOW = (2 ** 13) +1
646        self.MIN_SELECT_WINDOW = 32
647        self.MEM_USED=0
648        self.MAX_MEM=512
649        self.MIN_FREE_MEM_SERVER=100
650        self.USE_MAX_MEM=True
651        self.lock = Lock()
652        self.terminate = False
653        self.finished = False
654        self.paused = False
655        self.select_window = self.MIN_SELECT_WINDOW
656        self.commited = time.time()
657        self.procesed = 0
658        self.procesed_per_sec = [0]*10
659        self.load = 0
660        self.server_variables = None  # initialized by main worker
661        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
662        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
663        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
664        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
665        self.default_slow_check_clients = 1
666        self.slow_check_clients = self.default_slow_check_clients
667        self.server_mem = 0
668        self.loadlist = [ 0.0 ] * 100
669        self.max_heap = None
670        self.heap_alert_count = 0
671        self.last_events = []
672
673        signal.signal(signal.SIGQUIT,self.term)
674        signal.signal(signal.SIGTERM,self.term)
675        signal.signal(signal.SIGINT,self.term)
676
677        self.db = {}
678        self.threads = {}
679        for x in THREADS:
680            self.db[x] = DB(self,x)
681            #try:
682            #    self.db[x].init_db()
683            #except Exception as e:
684            #    printea('Error initializing database connections: {}'.format(str(e)))
685            #    sys.exit(1)
686        self.cfg= None
687        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
688
689    def windowctl(self, *args, **kwargs):
690        if args[0] == '+':
691            if self.select_window*2 < self.MAX_SELECT_WINDOW:
692                self.select_window*=2
693                self.select_window=int(self.select_window)
694        if args[0] == '-':
695            if self.select_window > self.MIN_SELECT_WINDOW:
696                self.select_window/=2
697                self.select_window=int(self.select_window)
698
699    def slowcheckctl(self, *args, **kwargs):
700        if args[0] == 'reset':
701            self.default_slow_check_clients = 0
702        if args[0] == '+':
703            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
704                if self.default_slow_check_clients == 0:
705                    self.default_slow_check_clients = 1
706                else:
707                    self.default_slow_check_clients = self.default_slow_check_clients * 2
708        if args[0] == '-':
709            if self.default_slow_check_clients > 0:
710                self.default_slow_check_clients = self.default_slow_check_clients / 2
711
712    def append_event_log(self,*args,**kwargs):
713        if DEBUG:
714            if USE_FILE_EVENT_LOG:
715                try:
716                    fp = open(FILE_EVENT_LOG,'a')
717                except:
718                    fp = False
719            else:
720                fp = False
721            separator=':'
722            for x in args:
723                the_time=str(int(time.time()))
724                the_value=str(x)
725                the_string=the_time+separator+the_value
726                self.last_events.append(the_string)
727                if fp:
728                    fp.write('{}\n'.format(the_string))
729            if fp:
730                fp.close()
731
732    def schedule(self, *args, **kwargs):
733        if kwargs['event'] == 'NO_MORE_CLIENTS':
734            self.append_event_log(kwargs['event'])
735            self.windowctl('-')
736            self.slowcheckctl('+')
737
738        if kwargs['event'] == 'HAVE_CLIENTS':
739            self.append_event_log(kwargs['event'])
740            if kwargs['nclients'] == self.select_window:
741                self.windowctl('+')
742            else:
743                self.windowctl('-')
744            self.slowcheckctl('reset')
745#
746#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
747#                    self.windowctl('+')
748#
749#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
750#                    self.windowctl('-')
751#                    self.slowcheckctl('+')
752#                else:
753#                    self.windowctl('+')
754#
755#            elif kwargs['nclients'] < self.select_window/2:
756#                self.windowctl('-')
757#
758#            self.slowcheckctl('reset')
759
760        if kwargs['event']=='CHECK_SANITY':
761            # CPU SETTINGS
762            if not self.terminate and self.load > LIMIT:
763                self.append_event_log('LOAD_LIMIT_REACHED')
764                self.paused = True
765            else:
766                self.paused = False
767
768            # DB MEM SETTINGS
769            if self.max_heap and self.temporary_tables_size['sum']:
770                if self.temporary_tables_size['sum'] > self.max_heap * 0.3:
771                    self.heap_alert_count += 1
772                    self.append_event_log('MAX_HEAP_ALERT')
773                    self.windowctl('+')
774                    self.slowcheckctl('-')
775                    if self.heap_alert_count > 50:
776                        self.append_event_log('EXTREME_HEAP_RULES')
777                        self.slowcheckctl('reset')
778                    if self.paused:
779                        #printea('Hitting max temporary table size unpausing','critical')
780                        self.paused = False
781                else:
782                    self.heap_alert_count=0
783            # SERVER MEM
784            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
785                self.append_event_log('SERVER_MEM_ALERT')
786                #printea('Hitting max memory from server collecting and reducing window','critical')
787                self.windowctl('-')
788                self.slowcheckctl('+')
789                self.USE_MAX_QUEUES=True
790                for x in THREADS:
791                    self.db[x].need_clean=True
792                gc.collect()
793            else:
794#                self.USE_MAX_QUEUES=False
795                for x in THREADS:
796                    self.db[x].need_clean=False
797
798            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
799                self.append_event_log('MAX_MEM_ALERT')
800                self.windowctl('-')
801                self.slowcheckctl('+')
802
803            # QUEUES
804            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
805                self.append_event_log('MAX_QUEUES_REACHED')
806                self.windowctl('+')
807
808
809    def get_cpu_load(self):
810        self.loadlist.append(psutil.cpu_percent())
811        self.loadlist=self.loadlist[1:]
812        avg=0.0
813        for x in self.loadlist:
814            avg+=x
815        return round(avg/100.0,2)
816
817    def term(self,*args,**kwargs):
818        printea("Begin kill the program, wait please...",'info')
819        self.terminate=True
820
821    def prepare_threads(self):
822        global DAEMON
823
824        self.threads['load']=Process(target=self.get_load)
825        self.threads['load'].daemon = DAEMON
826        self.threads['load'].start()
827        for x in THREADS:
828            self.threads[x]=Process(target=self.db[x].worker)
829            self.threads[x].daemon = DAEMON
830            self.threads[x].start()
831
832    def get_mem_usage(self):
833        process = psutil.Process(os.getpid())
834        try:
835            mem = process.memory_info()[0] / float(2 ** 20)
836        except:
837            mem = process.get_memory_info()[0] / float(2 ** 20)
838        return mem
839
840    def get_server_free_mem(self):
841        mem = psutil.virtual_memory()
842        return mem.free / (2 ** 20)
843
844    def print_stats(self):
845        global CHECK_LOAD_TIME
846#        if DEBUG:
847        if True:
848            out="Processed: "
849            out2=''
850            total=0
851            for x in THREADS:
852                out2+='{}={} '.format(x,self.db[x].processed)
853                self.cfg.store('processed '+x,self.db[x].processed)
854                if THREADED:
855                    if x != 'main':
856                        total += self.db[x].processed
857                else:
858                    total += self.db[x].processed
859            out += "TOTAL={} {}".format(total,out2)
860            self.cfg.store('processed total',total)
861            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
862            self.procesed_per_sec.append(proc_per_sec)
863            self.procesed_per_sec=self.procesed_per_sec[-10:]
864            f=lambda x,y:x+y
865            suma_proc=reduce(f,self.procesed_per_sec)
866            self.cfg.store('processing at',int(suma_proc/10))
867            self.procesed=total
868            total = 0
869            if THREADED:
870                out+="Queues: "
871                sizes=self.get_q_sizes()
872                out2=''
873                for x in sizes:
874                    self.cfg.store('queue '+x,sizes[x])
875                    out2+='{}={} '.format(x,sizes[x])
876                    total += sizes[x]
877                self.cfg.store('queue totals',total)
878                out+="TOTAL={} {}".format(total,out2)
879            self.cfg.store('select window',self.select_window)
880            self.cfg.store('mem used',self.MEM_USED)
881            self.cfg.store('load',self.load)
882            if (self.paused):
883                self.cfg.store('paused',1)
884            else:
885                self.cfg.store('paused',0)
886            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
887            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
888            self.cfg.store('temp_clients_rows',int(self.temporary_tables_rows['clients']))
889            self.cfg.store('temp_packages_rows',int(self.temporary_tables_rows['packages']))
890            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
891            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
892            self.cfg.store('default_slow_check_clients',int(self.default_slow_check_clients))
893            self.cfg.store('slow_check_clients',int(self.slow_check_clients))
894            if DEBUG:
895                if USE_FILE_EVENT_DB:
896                    max_event=len(self.last_events)
897                    if max_event > 20:
898                        max_event = 20
899                    for i in range(0,max_event):
900                        self.cfg.store('event{:02d}'.format(i),self.last_events[i])
901                self.last_events=self.last_events[-20:]
902                if THREADED:
903                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
904                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
905                else:
906                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
907                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
908
909    def get_q_sizes(self):
910        sizes={}
911        for x in THREADS:
912            sizes[x]=self.db[x].q.qsize()
913        return sizes
914
915    def sum_q_sizes(self):
916        sizes=self.get_q_sizes()
917        f=lambda x,y: x+y
918        return reduce(f,[sizes[x] for x in sizes])
919
920    def all_queues_empty(self):
921        sizes=self.get_q_sizes()
922        for x in sizes:
923            if sizes[x] != 0:
924                return False
925        return True
926
927    def get_load(self):
928        # runs on separate thread
929        db = DB(self,'load')
930        db.init_db()
931        self.cfg = Config(db)
932        ctime=0
933        while not (self.terminate and self.finished): #and not self.paused
934            self.cfg.store('keepalive',int(time.time()))
935            time.sleep(1)
936            self.load=self.get_cpu_load()
937            ctime+=1
938            self.schedule(event='CHECK_SANITY')
939            if ctime >CHECK_LOAD_TIME:
940                self.cfg.write()
941                ctime=0.0
942
943                self.MEM_USED=self.get_mem_usage()
944                self.server_mem=self.get_server_free_mem()
945
946                if self.server_variables and 'max_heap_table_size' in self.server_variables:
947                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
948                    if self.max_heap < WARNING_MIN_HEAP:
949                        printea("******************** MIN HEAP IS TOO SMALL -> {} < {} ********************".format(self.max_heap,WARNING_MIN_HEAP),'warning')
950
951                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='tmp_clients')
952                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='tmp_packages')
953                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='Client_Versions')
954                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='RecvPackages')
955                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
956                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
957                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
958                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
959                qempty = self.all_queues_empty()
960                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
961                    db.reset_autoinc()
962                self.print_stats()
963                #self.load=os.getloadavg()[0]
964                if qempty:
965                    gc.collect()
966            #end if
967        #end while
968        db.close_db()
969
970    def start(self):
971        self.prepare_threads()
972
973    def end(self):
974        for x in self.db:
975            self.threads[x].join()
976            self.db[x].close_db()
977        self.finished = True
978        self.print_stats()
979
980
981class Config():
982    def __init__(self,connection):
983        self._db = connection
984        self.read()
985
986    def store(self,var,value):
987        var=var.replace(' ','_')
988        if isinstance(value,str):
989            if value.isnumeric():
990                setattr(self,var,int(value))
991            else:
992                setattr(self,var,str(value))
993        else:
994            setattr(self,var,int(value))
995
996    def write(self):
997        values={}
998        for x in self.get_internal_vars():
999            values.setdefault(str(x)[:20],str(getattr(self,x))[:45])
1000        if self._db:
1001            self._db.put_config(values=values)
1002        the_string="CONFIGURATION\n"
1003        if USE_CONFIG_FILE_LOG:
1004            fnbuild = lambda kv : str(kv[0])+'='+str(kv[1])+"\n"
1005            the_string+=reduce((lambda x,y: x+y) ,map(fnbuild,values.items()))
1006            try:
1007                with open(CONFIG_FILE_LOG,'w') as fp:
1008                    fp.write(the_string)
1009            except:
1010                pass
1011
1012    def read(self):
1013        if self._db:
1014            config=self._db.get_config()
1015            for key in config.keys():
1016                if config[key].isnumeric():
1017                    setattr(self,key,int(config[key]))
1018                else:
1019                    setattr(self,key,config[key])
1020        else:
1021            printea('No config yet')
1022
1023    def get_internal_vars(self):
1024        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
1025
1026    def print(self):
1027        for v in self.get_internal_vars():
1028            print('{} = {}'.format(v,getattr(self,v)))
1029
1030
1031def main(*args,**kwargs):
1032    gc.enable()
1033    if DAEMON:
1034        fp = open('/var/run/analyticsd.pid','w');
1035        fp.write(str(os.getpid()))
1036        fp.close()
1037    m = Monitor()
1038    m.start()
1039    printea("start done",'info')
1040    while not m.terminate:
1041        time.sleep(0.5)
1042    m.end()
1043    printea("Exitting...",'info')
1044
1045if __name__ == "__main__":
1046    exit = 0
1047    keyword='analyticsd'
1048    interpreter='python3'
1049    for proc in psutil.process_iter():
1050        a=False
1051        b=False
1052        try:
1053            cmd=proc.cmdline()
1054        except:
1055            cmd=proc.cmdline
1056        for argument in cmd:
1057            #print('{} {} {}'.format(proc.cmdline,keyword,argument[-len(keyword):]))
1058            if interpreter in argument[-len(interpreter):]:
1059                a = True
1060            if keyword in argument[-len(keyword):]:
1061                b = True
1062            if a and b:
1063                exit = exit +1
1064    if exit > 1:
1065        printea('Another daemon is running','error')
1066        sys.exit(1)
1067
1068    lck = '/var/run/analyticsd'
1069    if DAEMON:
1070        if os.path.isfile(lck+'.lock'):
1071            printea('Lockfile {} detected, unable to start'.format(lck),'error')
1072            sys.exit(1)
1073        else:
1074            try:
1075                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
1076                    main()
1077            except Exception as e:
1078                printea(e)
1079                sys.exit(1)
1080    else:
1081        main()
Note: See TracBrowser for help on using the repository browser.