source: lliurex-analytics-server/trunk/fuentes/lliurex-analytics-server/usr/sbin/analyticsd @ 7150

Last change on this file since 7150 was 7150, checked in by mabarracus, 2 years ago

Ltsp graph implementation

  • Property svn:executable set to *
File size: 41.2 KB
Line 
1#!/usr/bin/env python3
2
3import time
4#import threading
5#from multiprocessing.dummy import Pool as ThreadPool
6from multiprocessing.dummy import Process,Lock
7#import multiprocessing
8import sys
9import MySQLdb as mdb
10import signal
11import os
12import queue
13import hashlib
14import gc
15import os
16import psutil
17from functools import wraps
18import re
19import daemon
20import lockfile
21import logging
22import logging.handlers
23from logging import config
24from functools import reduce
25import traceback
26
27##### START EDITABLE VARS #####
28
29DEBUG=False
30MIN_LOG_LEVEL=logging.INFO
31DAEMON=True
32FILE='/usr/lib/analytics-server/analytics/config.php'
33LIMIT=70.0       # PERCENTAGE LOAD LIMIT
34WARNING_MIN_HEAP = 256
35##### END EDITABLE VARS #####
36
37DEBUG_PRINT_ALIVE_COUNTER=60
38THREADED=True
39MAX_SLOW_CHECK_CLIENTS = 30
40USE_FILE_EVENT_LOG=True
41USE_FILE_EVENT_DB=False
42USE_CONFIG_FILE_LOG=True
43CONFIG_FILE_LOG='/var/run/analyticsd_config.log'
44FILE_EVENT_LOG='/var/run/analyticsd_event.log'
45
46if DEBUG:
47    if MIN_LOG_LEVEL:
48        loglevel=MIN_LOG_LEVEL
49    else:
50        loglevel=logging.DEBUG
51else:
52    loglevel=logging.INFO
53
54LOGGING = {
55    'version': 1,
56    'disable_existing_loggers': False,
57    'formatters': {
58        'verbose': {
59            'format': '%(levelname)s %(module)s %(message)s'
60            #'format': '%(levelname)s %(module)s P%(process)d T%(thread)d %(message)s'
61            },
62        },
63    'handlers': {
64        'stdout': {
65            'class': 'logging.StreamHandler',
66            'stream': sys.stdout,
67            'formatter': 'verbose',
68            },
69        'sys-logger6': {
70            'class': 'logging.handlers.SysLogHandler',
71            'address': '/dev/log',
72            'facility': "local6",
73            'formatter': 'verbose',
74            },
75        },
76    'loggers': {
77        'analyticsd-logger': {
78            'handlers': ['sys-logger6','stdout'],
79            'level': loglevel,
80            'propagate': True,
81            },
82        }
83    }
84
85def to_str(x):
86    if isinstance(x,str):
87        return to_utf(x).decode('unicode_escape')
88    else:
89        return x
90
91def to_utf(x):
92    if isinstance(x,str):
93        return x.encode('utf-8')
94    else:
95        return x
96
97def printea(msg="",level='critical'):
98    if level == 'critical':
99        logger.critical(msg)
100    elif level == 'error':
101        logger.error(msg)
102    elif level == 'warning':
103        logger.warning(msg)
104    elif level == 'info':
105        logger.info(msg)
106    else:
107        logger.debug(msg)
108
109def keepalive(who=""):
110    global DEBUG_PRINT_ALIVE_COUNTER
111    t=str(int(time.time()))
112    if DEBUG:
113        pass
114        # too much verbose
115        # printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'debug')
116    else:
117        if who == 'main':
118            if DEBUG_PRINT_ALIVE_COUNTER > 0:
119                DEBUG_PRINT_ALIVE_COUNTER=DEBUG_PRINT_ALIVE_COUNTER-1
120            else:
121                DEBUG_PRINT_ALIVE_COUNTER=60
122                printea('on {} analyticsd({}) is alive, all good :-)'.format(t,who),'info')
123
124    fp = open('/var/run/analyticsd.keepalive','w');
125    fp.write(t)
126    fp.close()
127
128config.dictConfig(LOGGING)
129logger = logging.getLogger('analyticsd-logger')
130
131DBNAME=None
132USER=None
133PASS=None
134IP=None
135
136EMPTY_PAUSED_SLEEP=1
137CHECK_LOAD_TIME=5
138MAX_RETRIES=5
139TIMED_ON=False
140
141try:
142    with open(FILE,'r') as f:
143        for line in f:
144            if not IP:
145                IP = re.search("^\s*[$]dbhost\s*=\s*'(\w+)'\s*;\s*$",line)
146                if IP:
147                    IP = IP.group(1)
148            if not DBNAME:
149                DBNAME = re.search("^\s*[$]dbname\s*=\s*'(\w+)'\s*;\s*$",line)
150                if DBNAME:
151                    DBNAME = DBNAME.group(1)
152            if not USER:
153                USER = re.search("^\s*[$]dbuser\s*=\s*'(\w+)'\s*;\s*$",line)
154                if USER:
155                    USER = USER.group(1)
156            if not PASS:
157                PASS = re.search("^\s*[$]dbpass\s*=\s*'(\w+)'\s*;\s*$",line)
158                if PASS:
159                    PASS = PASS.group(1)
160    if not (IP or DBNAME or USER or PASS):
161        printea("Couldn't get database configuration from {}".format(FILE))
162    else:
163        printea("Using IP:{} DBNAME:{} USER:{} PASS:{}".format(IP,DBNAME,USER,PASS),'debug')
164except Exception as e:
165    printea("Couldn't parse {} Error:\"{}\"".format(FILE,str(e)))
166    sys.exit(1)
167
168if not (IP or DBNAME or USER or PASS):
169    printea("Couldn't get database configuration from {}".format(FILE))
170    sys.exit(1)
171
172if THREADED:
173    THREADS=['main','server','client','desktop','other']
174else:
175    THREADS=['main']
176
177class DB():
178    def __init__(self,mon,t='main'):
179        self._initialized=False
180        self.t=t
181        self.reconnect=0
182        self.empty = False
183        self.conn=None
184        self.mon=mon
185        self.q=queue.Queue()
186        self.processed=0
187        self.need_clean = False
188        printea('Database worker {} initialized'.format(t),'info')
189
190    def timed(func):
191        @wraps(func)
192        def wrapper(*args,**kwargs):
193            if TIMED_ON:
194                printea("Start({}): @{}".format(func.__name__,time.time()),'debug')
195            ret=func(*args,**kwargs)
196            if TIMED_ON:
197                printea("End  ({}): @{}".format(func.__name__,time.time()),'debug')
198            return ret
199        return wrapper
200
201    def with_retry(func):
202        @wraps(func)
203        def wrapper(*args,**kwargs):
204            if 'retry' not in kwargs:
205                kwargs['retry']=1
206            if 'mon' not in kwargs:
207                kwargs['mon'] = None
208            try:
209                return func(*args,**kwargs) 
210            except Exception as e:
211                printea("Error in ({}) retry={}/{}: {}".format(func.__name__,kwargs['retry'],MAX_RETRIES,str(e)))
212                if kwargs['retry'] == MAX_RETRIES or (kwargs['mon'] and kwargs['mon'].terminate):
213                    printea("Fatal error in ({}), max retries exceded".format(func.__name__))
214                    if kwargs['mon']:
215                        kwargs['mon'].term()
216                        sys.exit(1)
217                    return None
218                else:
219                    time.sleep(kwargs['retry']**2)
220                    kwargs['retry']+=1
221                    return wrapper(*args,**kwargs)
222            return result
223        return wrapper
224
225    def with_debug(func):
226        @wraps(func)
227        def wrapper(*args,**kwargs):
228            if 'query' in kwargs:
229                printea("executing query: {}".format(kwargs['query']),'debug')
230            return func(*args,**kwargs)
231        return wrapper
232
233    @with_debug
234    def execute(self,*args,**kwargs):
235        if 'query' not in kwargs:
236            printea("Warning execute called whithout query",'info')
237            return None
238        try:
239            return self.cur.execute(kwargs['query'])
240        except mdb.OperationalError as e:
241                printea('({}) Operational error on mysql, error is: {}'.format(self.t,e),'error')
242                self.init_db()
243        except Exception as e:
244                raise Exception('Error executing: Error=({}) Query=({}) '.format(str(e),kwargs['query']))
245
246    def init_db(self):
247        try:
248            self.conn = mdb.connect(IP,USER,PASS,DBNAME)
249            self.conn.autocommit(False)
250            self.cur = self.conn.cursor()
251            self.cur.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
252            variables=self.check_server_variables()
253
254            if variables:
255                self.mon.server_variables=variables
256
257            printea("Connected succesfully {} thread_id={}".format(self.t,self.conn.thread_id()),'info')
258
259        except mdb.Error as e:
260            printea("Error {}: {}".format(e.args[0],e.args[1]))
261            raise Exception(e)
262
263    def check_server_variables(self):
264        if self.t != 'main':
265            return None
266        else:
267            printea('Getting server variables','info')
268            result = {}
269            self.cur.execute("show global variables")
270            printea('Setting {} vars'.format(self.cur.rowcount),'debug')
271            if self.cur.rowcount > 0:
272                for i in range(self.cur.rowcount):
273                    var_name, var_value = self.cur.fetchone()
274                    result.setdefault(var_name,var_value)
275            return result
276
277    def get_config(self):
278        values={}
279        self.cur.execute('select name,value from Config')
280        for val in self.cur.fetchall():
281            values.setdefault(val[0],val[1])
282        return values
283
284    @with_retry
285    def put_config(self,*args,**kwargs):
286        values = kwargs.get('values')
287        vals=[]
288        for x in values.keys():
289            vals.append("('{}','{}')".format(x,values[x]))
290        try:
291            self.execute(query="insert into Config(`name`, `value`) values {} on duplicate key update name=VALUES(name), value=VALUES(value);".format(','.join(vals)))
292            self.conn.commit()
293        except Exception as e:
294            printea(e,'error')
295
296    @with_retry
297    def check_temporary_tables_size(self,*args,**kwargs):
298        table_name=kwargs.get('table_name')
299        if self.t != 'load':
300            return None
301        else:
302            size = 0
303            rows = 0
304            self.execute(query='select floor((data_length + index_length) / 1048576) `size`,table_rows from information_schema.TABLES where table_schema = "analytics" and table_name = "{}";'.format(table_name))
305
306            res=self.cur.fetchone()
307            size = float(res[0])
308            rows = int(res[1])
309            return (int(size),rows)
310
311    def close_db(self):
312        if self.conn:
313            self.conn.close()
314            printea("Closed connection {}".format(self.t),'info')
315
316    def reduce_flavour(self,version,flavour):
317        if version == '15':
318            if 'server' not in flavour and 'client' not in flavour and 'desktop' in flavour:
319                return 'desktop'
320            elif 'server' in flavour:
321                return 'server'
322            elif 'client' in flavour:
323                return 'client' 
324        elif version == '16':
325            if 'server' in flavour:
326                return 'server'
327            elif 'client' in flavour:
328                return 'client'
329            elif 'desktop' in flavour:
330                return 'desktop'
331        return 'other'
332
333    def reduce_version(self,version):
334        if version[0:2] in ['15','16']:
335            return version[0:2]
336        else:
337            return 'other'
338
339    def gen_uuid(self,*args,**kwargs):
340        try:
341            string=to_utf(u'-'.join([str(x) for x in args]))
342            h=hashlib.sha1(string)
343            return int(h.hexdigest()[0:16],16)
344        except Exception as e:
345            printea(traceback.format_exc(),'critical')
346
347    @timed
348    @with_retry
349    def get_client(self,*args,**kwargs):
350        try:
351            query="SELECT id,date,user,version,sabor,arch,mem,vga,cpu,ncpu,ltsp,mode from tmp_clients where status=1 LIMIT {}".format(int(self.mon.select_window))
352            self.execute(query=query)
353            ret =[]
354            if self.cur.rowcount > 0:
355                for i in range(self.cur.rowcount):
356                    v_id,v_date,v_user,v_version,v_flavour,v_arch,v_mem,v_vga,v_cpu,v_ncpu,v_ltsp,v_mode=self.cur.fetchone()
357                    version=self.reduce_version(v_version)
358                    flavour=self.reduce_flavour(version,v_flavour)
359                    uuid = self.gen_uuid(v_date.month,v_date.year,v_user,v_version,v_flavour)
360                    if not v_arch:
361                        v_arch = 'NULL'
362                    if not v_mem:
363                        v_mem = 'NULL'
364                    if not v_vga:
365                        v_vga = 'NULL'
366                    if not v_cpu:
367                        v_cpu = 'NULL'
368                    if not v_ncpu:
369                        v_ncpu = 'NULL'
370                    if v_ltsp == '1' or v_ltsp == True:
371                        v_ltsp = 'TRUE'
372                    elif v_ltsp == '0' or v_ltsp == False:
373                        v_ltsp = 'FALSE'
374                    else:
375                        v_ltsp = 'NULL'
376                    if not v_mode:
377                        v_mode='NULL'
378                    else:
379                        v_mode="'"+v_mode+"'"
380
381                    ret.append({'uuid':uuid,'id':v_id,'date':v_date,'uid':v_user,'version':version,'flavour':flavour,'rversion':v_version,'rflavour':v_flavour,'arch':v_arch,'mem':v_mem,'vga':v_vga,'cpu':v_cpu,'ncpu':v_ncpu,'ltsp':v_ltsp,'mode':v_mode})
382                return ret
383            else:
384                return True
385        except Exception as e:
386            raise Exception("Error getting client: {}".format(e))
387
388    @timed
389    @with_retry
390    def get_apps(self,*args,**kwargs):
391        if 'clients' not in kwargs:
392            printea("Warning executed without named parameter clients",'info')
393            return None
394        ret = {}
395        try:
396            cids_to_rel_fla={}
397            for c in kwargs['clients']:
398                if c['id'] not in cids_to_rel_fla:
399                    cids_to_rel_fla[c['id']]=(c['version'],c['flavour'])
400                if c['flavour'] not in ret:
401                    ret[c['flavour']]={'apps':[],'clients':[],'timestamp':None}
402            for c in kwargs['clients']:
403                ret[c['flavour']]['clients'].append(c)
404
405            query = "SELECT client,date,app,value FROM tmp_packages WHERE client IN ({}) ".format(','.join(map(str,cids_to_rel_fla.keys())))
406            self.execute(query=query)
407            if self.cur.rowcount > 0:
408                for i in range(self.cur.rowcount):
409                    row = self.cur.fetchone()
410                    clid = row[0]
411                    rel,fla = cids_to_rel_fla[clid]
412                    uuid = self.gen_uuid(row[1].month,row[1].year,rel,fla,row[2])
413                    ret[fla]['apps'].append({'uuid':uuid,'release':rel,'flavour':fla,'id':clid,'date':row[1],'app':row[2],'value':row[3]})
414            return ret
415        except Exception as e:
416            raise Exception("Error getting apps: {}".format(e))
417
418    @timed
419    @with_retry
420    def put_client(self,*args,**kwargs):
421        if 'client_list' not in kwargs:
422            raise Exception("Called without named parameter client_list")
423        values= []
424        for cli in kwargs['client_list']:
425            values.append("({},'{}','{}','{}','{}','{}','{}','{}',{},'{}','{}',{},{},{})".format(cli['uuid'],cli['date'].strftime('%Y-%m-%d'),cli['uid'],cli['rversion'],cli['rflavour'],cli['version'],cli['flavour'],cli['arch'],cli['mem'],cli['vga'],cli['cpu'],cli['ncpu'],cli['ltsp'],cli['mode']))
426        query = "INSERT INTO Client_Versions(`uuid`,`date`,`Client_uid`,`string_release`,`string_flavour`,`Releases_name`,`Flavours_name`,`arch`,`mem`,`vga`,`cpu`,`ncpu`,`ltsp`,`mode`) VALUES {} on duplicate key update uuid=uuid".format(','.join(map(str,values)))
427        self.execute(query=query)
428        return True
429
430    @timed
431    @with_retry
432    def put_apps(self,*args,**kwargs):
433        if 'apps' not in kwargs:
434            raise Exception("Called without named parameter apps")
435        app_list = {}
436        for app in kwargs['apps']:
437            #uuid = self.gen_uuid(app['date'].month,app['date'].year,app['release'],app['flavour'],app['app'])
438            if str(app['uuid']) not in app_list:
439                app_list[str(app['uuid'])]={'uuid':app['uuid'],'date':app['date'],'release':app['release'],'flavour':app['flavour'],'app':app['app'],'value':app['value']}
440            else:
441                app_list[str(app['uuid'])]['value']+=app['value']
442        values = []
443        for app in app_list:
444            item=app_list[app]
445            values.append("({},'{}','{}','{}','{}',{})".format(item['uuid'],item['date'].strftime('%Y-%m-%d'),item['release'],item['flavour'],item['app'],item['value']))
446        query = "INSERT INTO RecvPackages(`uuid`,`date`,`Releases_name`,`Flavours_name`,`string`,`count`) VALUES {} ON DUPLICATE KEY UPDATE count = count + VALUES(count)".format(','.join(map(str,values)))
447        self.execute(query=query)
448        return True
449
450    @timed
451    @with_retry
452    def del_client(self,*args,**kwargs):
453        if 'client_list' not in kwargs:
454            raise Exception("Called without named parameter client_list")
455        query = "DELETE FROM tmp_clients WHERE id IN ({}) and status=1".format(','.join(map(str,kwargs['client_list'])))
456        self.execute(query=query)
457        return True
458
459    @timed
460    @with_retry
461    def del_apps(self,*args,**kwargs):
462        if 'client_list' not in kwargs:
463            raise Exception("Called without named parameter client_list")
464        query = "DELETE FROM tmp_packages WHERE client IN ({})".format(','.join(map(str,kwargs['client_list'])))
465        self.execute(query=query)
466        return True
467
468    @timed
469    def reset_autoinc(self):
470        try:
471            query = "SELECT AUTO_INCREMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = 'tmp_packages'".format(DBNAME)
472            self.execute(query=query)
473            ainc = self.cur.fetchone()[0]
474            #query = "SELECT count(*) from tmp_clients"
475            #self.execute(query=query)
476            #cli_size=self.cur.fetchone()[0]
477            #query = "SELECT count(*) from tmp_packages"
478            #self.execute(query=query)
479            #pkg_size=self.cur.fetchone()[0]
480            #printea("Cleaning memory tables: autoinc={} count_tmp_clients={} count_tmp_packages={} queues_empty={}".format(ainc,cli_size,pkg_size,self.mon.all_queues_empty()),'debug')
481            if ainc > 65500 and self.mon.all_queues_empty():
482            #and cli_size == 0 and pkg_size == 0:
483                query = "TRUNCATE TABLE tmp_clients"
484                self.execute(query=query)
485                query = "TRUNCATE TABLE tmp_packages"
486                self.execute(query=query)
487            return True
488        except Exception as e:
489            raise Exception("Error reseting auto_increment: {}".format(e))
490
491
492    @timed
493    def process_main_thread(self,*args,**kwargs):
494        if self.mon.slow_check_clients > 0 :
495            self.mon.slow_check_clients -= 1
496            time.sleep(1)
497            return True
498        else:
499            self.mon.slow_check_clients = self.mon.default_slow_check_clients
500
501        clis=self.get_client(mon=self.mon)
502        if clis == True: #No clients found (empty)
503            self.empty = True # if > 65500 auto_increment was reset
504            self.mon.schedule(event='NO_MORE_CLIENTS')
505            return False
506        #clients found
507        self.mon.schedule(event='HAVE_CLIENTS',nclients=len(clis))
508        # if clients found get apps
509        lapps = self.get_apps(clients=clis,mon=self.mon)
510        #lapps can be empty
511        self.del_client(client_list=[cli['id'] for cli in clis],error_func=self.mon.term)
512        #If deletion was failed , thread was died
513        lapps_tmp={'apps':[],'clients':[]}
514        for fla in lapps:
515            if THREADED:
516                lapps[fla]['timestamp']=time.time()
517                self.mon.db[fla].q.put(lapps[fla],True)
518            else:
519                lapps_tmp['apps'].extend(lapps[fla]['apps'])
520                lapps_tmp['clients'].extend(lapps[fla]['clients'])
521                self.mon.db['main'].q.put(lapps_tmp,True)
522        #if DEBUG:
523        self.processed+=len(clis)
524        return True
525
526    @timed
527    def process_all_threads(self,*args,**kwargs):
528        lapps=self.q.get(True)
529        #print "Running {}".format(self.t)
530        if THREADED:
531            while (lapps['timestamp'] > self.mon.commited):
532                time.sleep(0.001)
533        if len(lapps['clients']) != 0:
534            printea('Thread {} putting client'.format(self.t),'debug')
535            #IF FAIL, AFTER RETRIES THREAD DIES
536            if not self.put_client(client_list=lapps['clients'],mon=self.mon):
537                self.q.put(lapps,True) # USELESS
538                return False    # USELESS
539        if len(lapps['apps']) != 0:
540            printea('Thread {} putting clientapps'.format(self.t),'debug')
541            if not (self.put_apps(apps=lapps['apps'],mon=self.mon)):
542                self.q.put(lapps,True) # USELESS
543                return False    # USELESS
544            if not self.del_apps(client_list=[ cli['id'] for cli in lapps['clients']],mon=self.mon):
545                self.q.put(lapps,True) # USELESS
546                return False    # USELESS
547        #if DEBUG:
548        self.processed+=len(lapps['clients'])
549        return True
550
551    def process(self,*args,**kwargs):
552        keepalive(self.t)
553        # warning too much verbose, printea("Running thread {}".format(self.t),'debug')
554        if self.t == 'main' and not self.mon.terminate:
555            ret=self.process_main_thread(*args,**kwargs)
556            if ret == False: #No more clients available
557                return True
558            if ret == True:  #Clients was put on queues, need process more without waiting, main queue always empty
559                if THREADED:
560                    return True #Need return to avoid put empty flag and wait into main thread
561
562        #after this poing, code for all threads
563        if not self.q.empty():
564            ret=self.process_all_threads(*args,**kwargs)
565            if ret == False: # USELESS? THREADS was died
566                printea("Error threads")
567                return ret
568        else: 
569            del self.q
570            self.q = queue.Queue()
571            self.empty = True
572        return True
573
574    def worker(self):
575        printea("Starting worker {} processing".format(self.t),'info')
576        while not (self.mon.terminate and self.empty):
577            if self.mon.paused or self.empty:
578                #if self.empty and self.t == 'main':
579                #    self.reset_autoinc()
580                if self.mon.paused:
581                    printea("Paused by high load {}".format(self.t),'debug')
582                # Too much verbose
583                #if self.empty:
584                #    printea("Empty queue {} sleeping by now".format(self.t),'debug')
585                if self.empty:
586                    self.empty = False
587                time.sleep(EMPTY_PAUSED_SLEEP)
588            else:
589                try:
590                    if self.conn == None:
591                        if not self._initialized:
592                            self._initialized = True
593                            raise EnvironmentError('Initializing connector {}'.format(self.t))
594                        else:
595                            raise Warning('Connection not available')
596                    else:
597                        self.conn.begin()
598                    if self.process():
599                        self.conn.commit()
600                        self.mon.commited=time.time()
601                        self.reconnect = 0
602                        if self.need_clean:
603                            gc.collect()
604                    else:
605                        self.conn.rollback()
606                except EnvironmentError as e:
607                    printea(e,'info')
608                    self.init_db()
609                except Warning as e:
610                    printea(e,'warning')
611                    self.init_db()
612                except Exception as e:
613                    try:
614                        if self.conn != None:
615                            self.conn.rollback()
616                    except:
617                        printea("Can't rollback last actions",'info')
618                        pass
619                    #if e[0] != 2006:
620                    #    printea("Exception processing worker({}): {}".format(self.t,e))
621                    #if e[0] == 2006: # SERVER GONE AWAY
622                    #    printea("Worker ({}) detected server gone away !!!: {}".format(self.t,e))
623
624                    printea("Trying to recover connection ({}), {}".format(self.t,e))
625                    if self.reconnect == 100:
626                        printea("Worker ({}) says: lost connection to database, reconnection not possible, terminating all processes".format(self.t))
627                        self.mon.term()
628                    else:
629                        self.reconnect+=1
630                        printea('Reconnect({}) to mysql sleeping({})'.format(self.reconnect,self.reconnect*self.reconnect))
631                        time.sleep(self.reconnect*self.reconnect)
632
633                        try:
634                            self.init_db()
635                            printea("Recovered worker {} connection".format(self.t),'info')
636                        except:
637                            printea('Unable to initialize worker {}'.format(self.t))
638                            pass
639
640
641class Monitor():
642    def __init__(self):
643        self.MAX_QUEUE_UTILIZATION = 100
644        self.USE_MAX_QUEUES = True
645        self.MAX_SELECT_WINDOW = (2 ** 13) +1
646        self.MIN_SELECT_WINDOW = 32
647        self.MEM_USED=0
648        self.MAX_MEM=512
649        self.MIN_FREE_MEM_SERVER=100
650        self.USE_MAX_MEM=True
651        self.lock = Lock()
652        self.terminate = False
653        self.finished = False
654        self.paused = False
655        self.select_window = self.MIN_SELECT_WINDOW
656        self.commited = time.time()
657        self.procesed = 0
658        self.procesed_per_sec = [0]*10
659        self.load = 0
660        self.server_variables = None  # initialized by main worker
661        self.temporary_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
662        self.temporary_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
663        self.db_tables_size = { 'clients' : 0.0, 'packages':0.0 ,'sum': 0.0 }
664        self.db_tables_rows = { 'clients': 0, 'packages' : 0 , 'sum': 0 }
665        self.default_slow_check_clients = 1
666        self.slow_check_clients = self.default_slow_check_clients
667        self.server_mem = 0
668        self.loadlist = [ 0.0 ] * 100
669        self.max_heap = None
670        self.heap_alert_count = 0
671        self.last_events = []
672
673        signal.signal(signal.SIGQUIT,self.term)
674        signal.signal(signal.SIGTERM,self.term)
675        signal.signal(signal.SIGINT,self.term)
676
677        self.db = {}
678        self.threads = {}
679        for x in THREADS:
680            self.db[x] = DB(self,x)
681            #try:
682            #    self.db[x].init_db()
683            #except Exception as e:
684            #    printea('Error initializing database connections: {}'.format(str(e)))
685            #    sys.exit(1)
686        self.cfg= None
687        printea("Monitor initialized with {} threads".format(len(THREADS)),'info')
688
689    def windowctl(self, *args, **kwargs):
690        if args[0] == '+':
691            if self.select_window*2 < self.MAX_SELECT_WINDOW:
692                self.select_window*=2
693                self.select_window=int(self.select_window)
694        if args[0] == '-':
695            if self.select_window > self.MIN_SELECT_WINDOW:
696                self.select_window/=2
697                self.select_window=int(self.select_window)
698
699    def slowcheckctl(self, *args, **kwargs):
700        if args[0] == 'reset':
701            self.default_slow_check_clients = 0
702        if args[0] == '+':
703            if self.default_slow_check_clients < MAX_SLOW_CHECK_CLIENTS:
704                if self.default_slow_check_clients == 0:
705                    self.default_slow_check_clients = 1
706                else:
707                    self.default_slow_check_clients = self.default_slow_check_clients * 2
708        if args[0] == '-':
709            if self.default_slow_check_clients > 0:
710                self.default_slow_check_clients = self.default_slow_check_clients / 2
711
712    def append_event_log(self,*args,**kwargs):
713        if DEBUG:
714            if USE_FILE_EVENT_LOG:
715                try:
716                    fp = open(FILE_EVENT_LOG,'a')
717                except:
718                    fp = False
719            else:
720                fp = False
721            separator=':'
722            for x in args:
723                the_time=str(int(time.time()))
724                the_value=str(x)
725                the_string=the_time+separator+the_value
726                self.last_events.append(the_string)
727                if fp:
728                    fp.write('{}\n'.format(the_string))
729            if fp:
730                fp.close()
731
732    def schedule(self, *args, **kwargs):
733        if kwargs['event'] == 'NO_MORE_CLIENTS':
734            self.append_event_log(kwargs['event'])
735            self.windowctl('-')
736            self.slowcheckctl('+')
737
738        if kwargs['event'] == 'HAVE_CLIENTS':
739            self.append_event_log(kwargs['event'])
740            if kwargs['nclients'] == self.select_window:
741                self.windowctl('+')
742            else:
743                self.windowctl('-')
744            self.slowcheckctl('reset')
745#
746#                if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
747#                    self.windowctl('+')
748#
749#                if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
750#                    self.windowctl('-')
751#                    self.slowcheckctl('+')
752#                else:
753#                    self.windowctl('+')
754#
755#            elif kwargs['nclients'] < self.select_window/2:
756#                self.windowctl('-')
757#
758#            self.slowcheckctl('reset')
759
760        if kwargs['event']=='CHECK_SANITY':
761            # CPU SETTINGS
762            if not self.terminate and self.load > LIMIT:
763                self.append_event_log('LOAD_LIMIT_REACHED')
764                self.paused = True
765            else:
766                self.paused = False
767
768            # DB MEM SETTINGS
769            if self.max_heap and self.temporary_tables_size['sum']:
770                if self.temporary_tables_size['sum'] > self.max_heap * 0.3:
771                    self.heap_alert_count += 1
772                    self.append_event_log('MAX_HEAP_ALERT')
773                    self.windowctl('+')
774                    self.slowcheckctl('-')
775                    if self.heap_alert_count > 50:
776                        self.append_event_log('EXTREME_HEAP_RULES')
777                        self.slowcheckctl('reset')
778                    if self.paused:
779                        #printea('Hitting max temporary table size unpausing','critical')
780                        self.paused = False
781                else:
782                    self.heap_alert_count=0
783            # SERVER MEM
784            if self.server_mem and self.server_mem < self.MIN_FREE_MEM_SERVER:
785                self.append_event_log('SERVER_MEM_ALERT')
786                #printea('Hitting max memory from server collecting and reducing window','critical')
787                self.windowctl('-')
788                self.slowcheckctl('+')
789                self.USE_MAX_QUEUES=True
790                for x in THREADS:
791                    self.db[x].need_clean=True
792                gc.collect()
793            else:
794#                self.USE_MAX_QUEUES=False
795                for x in THREADS:
796                    self.db[x].need_clean=False
797
798            if self.USE_MAX_MEM and self.MEM_USED > self.MAX_MEM:
799                self.append_event_log('MAX_MEM_ALERT')
800                self.windowctl('-')
801                self.slowcheckctl('+')
802
803            # QUEUES
804            if self.USE_MAX_QUEUES and self.sum_q_sizes() > self.MAX_QUEUE_UTILIZATION:
805                self.append_event_log('MAX_QUEUES_REACHED')
806                self.windowctl('+')
807
808
809    def get_cpu_load(self):
810        self.loadlist.append(psutil.cpu_percent())
811        self.loadlist=self.loadlist[1:]
812        avg=0.0
813        for x in self.loadlist:
814            avg+=x
815        return round(avg/100.0,2)
816
817    def term(self,*args,**kwargs):
818        printea("Begin kill the program, wait please...",'info')
819        self.terminate=True
820
821    def prepare_threads(self):
822        global DAEMON
823
824        self.threads['load']=Process(target=self.get_load)
825        self.threads['load'].daemon = DAEMON
826        self.threads['load'].start()
827        for x in THREADS:
828            self.threads[x]=Process(target=self.db[x].worker)
829            self.threads[x].daemon = DAEMON
830            self.threads[x].start()
831
832    def get_mem_usage(self):
833        process = psutil.Process(os.getpid())
834        mem = process.memory_info()[0] / float(2 ** 20)
835        return mem
836
837    def get_server_free_mem(self):
838        mem = psutil.virtual_memory()
839        return mem.free / (2 ** 20)
840
841    def print_stats(self):
842        global CHECK_LOAD_TIME
843#        if DEBUG:
844        if True:
845            out="Processed: "
846            out2=''
847            total=0
848            for x in THREADS:
849                out2+='{}={} '.format(x,self.db[x].processed)
850                self.cfg.store('processed '+x,self.db[x].processed)
851                if THREADED:
852                    if x != 'main':
853                        total += self.db[x].processed
854                else:
855                    total += self.db[x].processed
856            out += "TOTAL={} {}".format(total,out2)
857            self.cfg.store('processed total',total)
858            proc_per_sec=int((total-self.procesed)/CHECK_LOAD_TIME)
859            self.procesed_per_sec.append(proc_per_sec)
860            self.procesed_per_sec=self.procesed_per_sec[-10:]
861            f=lambda x,y:x+y
862            suma_proc=reduce(f,self.procesed_per_sec)
863            self.cfg.store('processing at',int(suma_proc/10))
864            self.procesed=total
865            total = 0
866            if THREADED:
867                out+="Queues: "
868                sizes=self.get_q_sizes()
869                out2=''
870                for x in sizes:
871                    self.cfg.store('queue '+x,sizes[x])
872                    out2+='{}={} '.format(x,sizes[x])
873                    total += sizes[x]
874                self.cfg.store('queue totals',total)
875                out+="TOTAL={} {}".format(total,out2)
876            self.cfg.store('select window',self.select_window)
877            self.cfg.store('mem used',self.MEM_USED)
878            self.cfg.store('load',self.load)
879            if (self.paused):
880                self.cfg.store('paused',1)
881            else:
882                self.cfg.store('paused',0)
883            self.cfg.store('temp_clients_size',int(self.temporary_tables_size['clients']))
884            self.cfg.store('temp_packages_size',int(self.temporary_tables_size['packages']))
885            self.cfg.store('temp_clients_rows',int(self.temporary_tables_rows['clients']))
886            self.cfg.store('temp_packages_rows',int(self.temporary_tables_rows['packages']))
887            self.cfg.store('db_clients_size',int(self.db_tables_size['clients']))
888            self.cfg.store('db_packages_size',int(self.db_tables_size['packages']))
889            self.cfg.store('default_slow_check_clients',int(self.default_slow_check_clients))
890            self.cfg.store('slow_check_clients',int(self.slow_check_clients))
891            if DEBUG:
892                if USE_FILE_EVENT_DB:
893                    max_event=len(self.last_events)
894                    if max_event > 20:
895                        max_event = 20
896                    for i in range(0,max_event):
897                        self.cfg.store('event{:02d}'.format(i),self.last_events[i])
898                self.last_events=self.last_events[-20:]
899                if THREADED:
900                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
901                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
902                else:
903                    printea("{} {}SelectSize={} ProcessingAt={} Mem={} Load={} Paused={} Tsize={} Trows={}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED),self.load,self.paused,self.temporary_tables_size['sum'],self.temporary_tables_rows['sum']),'info')
904                    #sys.stdout.write("{}{} {}SelectSize={} ProcessingAt={} Mem={}\n".format('\r'*100,time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()),out,self.select_window,int(suma_proc/10),int(self.MEM_USED)))
905
906    def get_q_sizes(self):
907        sizes={}
908        for x in THREADS:
909            sizes[x]=self.db[x].q.qsize()
910        return sizes
911
912    def sum_q_sizes(self):
913        sizes=self.get_q_sizes()
914        f=lambda x,y: x+y
915        return reduce(f,[sizes[x] for x in sizes])
916
917    def all_queues_empty(self):
918        sizes=self.get_q_sizes()
919        for x in sizes:
920            if sizes[x] != 0:
921                return False
922        return True
923
924    def get_load(self):
925        # runs on separate thread
926        db = DB(self,'load')
927        db.init_db()
928        self.cfg = Config(db)
929        ctime=0
930        while not (self.terminate and self.finished): #and not self.paused
931            self.cfg.store('keepalive',int(time.time()))
932            time.sleep(1)
933            self.load=self.get_cpu_load()
934            ctime+=1
935            self.schedule(event='CHECK_SANITY')
936            if ctime >CHECK_LOAD_TIME:
937                self.cfg.write()
938                ctime=0.0
939
940                self.MEM_USED=self.get_mem_usage()
941                self.server_mem=self.get_server_free_mem()
942
943                if self.server_variables and 'max_heap_table_size' in self.server_variables:
944                    self.max_heap=int(self.server_variables['max_heap_table_size'])/(2**20)
945                    if self.max_heap < WARNING_MIN_HEAP:
946                        printea("******************** MIN HEAP IS TOO SMALL -> {} < {} ********************".format(self.max_heap,WARNING_MIN_HEAP),'warning')
947
948                self.temporary_tables_size['clients'],self.temporary_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='tmp_clients')
949                self.temporary_tables_size['packages'],self.temporary_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='tmp_packages')
950                self.db_tables_size['clients'],self.db_tables_rows['clients']=db.check_temporary_tables_size(mon=self,table_name='Client_Versions')
951                self.db_tables_size['packages'],self.db_tables_rows['packages']=db.check_temporary_tables_size(mon=self,table_name='RecvPackages')
952                self.temporary_tables_size['sum'] =self.temporary_tables_size['clients']+self.temporary_tables_size['packages']
953                self.temporary_tables_rows['sum'] =self.temporary_tables_rows['clients']+self.temporary_tables_rows['packages']
954                self.db_tables_size['sum'] =self.db_tables_size['clients']+self.db_tables_size['packages']
955                self.db_tables_rows['sum'] =self.db_tables_rows['clients']+self.db_tables_rows['packages']
956                qempty = self.all_queues_empty()
957                if self.temporary_tables_rows['clients'] == 0 and self.temporary_tables_rows['packages'] == 0 and qempty:
958                    db.reset_autoinc()
959                self.print_stats()
960                #self.load=os.getloadavg()[0]
961                if qempty:
962                    gc.collect()
963            #end if
964        #end while
965        db.close_db()
966
967    def start(self):
968        self.prepare_threads()
969
970    def end(self):
971        for x in self.db:
972            self.threads[x].join()
973            self.db[x].close_db()
974        self.finished = True
975        self.print_stats()
976
977
978class Config():
979    def __init__(self,connection):
980        self._db = connection
981        self.read()
982
983    def store(self,var,value):
984        var=var.replace(' ','_')
985        if isinstance(value,str):
986            if value.isnumeric():
987                setattr(self,var,int(value))
988            else:
989                setattr(self,var,str(value))
990        else:
991            setattr(self,var,int(value))
992
993    def write(self):
994        values={}
995        for x in self.get_internal_vars():
996            values.setdefault(str(x)[:20],str(getattr(self,x))[:45])
997        if self._db:
998            self._db.put_config(values=values)
999        the_string="CONFIGURATION\n"
1000        if USE_CONFIG_FILE_LOG:
1001            fnbuild = lambda kv : str(kv[0])+'='+str(kv[1])+"\n"
1002            the_string+=reduce((lambda x,y: x+y) ,map(fnbuild,values.items()))
1003            try:
1004                with open(CONFIG_FILE_LOG,'w') as fp:
1005                    fp.write(the_string)
1006            except:
1007                pass
1008
1009    def read(self):
1010        if self._db:
1011            config=self._db.get_config()
1012            for key in config.keys():
1013                if config[key].isnumeric():
1014                    setattr(self,key,int(config[key]))
1015                else:
1016                    setattr(self,key,config[key])
1017        else:
1018            printea('No config yet')
1019
1020    def get_internal_vars(self):
1021        return list(filter(lambda x : x[0] != '_',self.__dict__.keys()))
1022
1023    def print(self):
1024        for v in self.get_internal_vars():
1025            print('{} = {}'.format(v,getattr(self,v)))
1026
1027
1028def main(*args,**kwargs):
1029    gc.enable()
1030    if DAEMON:
1031        fp = open('/var/run/analyticsd.pid','w');
1032        fp.write(str(os.getpid()))
1033        fp.close()
1034    m = Monitor()
1035    m.start()
1036    printea("start done",'info')
1037    while not m.terminate:
1038        time.sleep(0.5)
1039    m.end()
1040    printea("Exitting...",'info')
1041
1042if __name__ == "__main__":
1043    exit = 0
1044    keyword='analyticsd'
1045    interpreter='python3'
1046    for proc in psutil.process_iter():
1047        a=False
1048        b=False
1049        for argument in proc.cmdline():
1050            #print('{} {} {}'.format(proc.cmdline,keyword,argument[-len(keyword):]))
1051            if interpreter in argument[-len(interpreter):]:
1052                a = True
1053            if keyword in argument[-len(keyword):]:
1054                b = True
1055            if a and b:
1056                exit = exit +1
1057    if exit > 1:
1058        printea('Another daemon is running','error')
1059        sys.exit(1)
1060
1061    lck = '/var/run/analyticsd'
1062    if DAEMON:
1063        if os.path.isfile(lck+'.lock'):
1064            printea('Lockfile {} detected, unable to start'.format(lck),'error')
1065            sys.exit(1)
1066        else:
1067            try:
1068                with daemon.DaemonContext(detach_process=True,working_directory='/tmp',umask=0o002,pidfile=lockfile.FileLock(lck),files_preserve=[logger.handlers[0].socket.fileno()]):
1069                    main()
1070            except Exception as e:
1071                printea(e)
1072                sys.exit(1)
1073    else:
1074        main()
Note: See TracBrowser for help on using the repository browser.